Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AsyncUploader to replace Engine.Unit with ComponentManager #6809

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/mempool/entity"
"github.com/onflow/flow-go/storage"
Expand All @@ -34,6 +35,7 @@ type BadgerRetryableUploaderWrapper struct {
results storage.ExecutionResults
transactionResults storage.TransactionResults
uploadStatusStore storage.ComputationResultUploadStatus
component.Component
}

func NewBadgerRetryableUploaderWrapper(
Expand Down Expand Up @@ -99,17 +101,10 @@ func NewBadgerRetryableUploaderWrapper(
results: results,
transactionResults: transactionResults,
uploadStatusStore: uploadStatusStore,
Component: uploader, // delegate to the AsyncUploader
}
}

func (b *BadgerRetryableUploaderWrapper) Ready() <-chan struct{} {
return b.uploader.Ready()
}

func (b *BadgerRetryableUploaderWrapper) Done() <-chan struct{} {
return b.uploader.Done()
}

func (b *BadgerRetryableUploaderWrapper) Upload(computationResult *execution.ComputationResult) error {
if computationResult == nil || computationResult.ExecutableBlock == nil ||
computationResult.ExecutableBlock.Block == nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uploader

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
executionDataMock "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool/entity"
"github.com/onflow/flow-go/module/metrics"

Expand All @@ -26,6 +28,8 @@ import (
)

func Test_Upload_invoke(t *testing.T) {
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
defer cancel()
wg := sync.WaitGroup{}
uploaderCalled := false

Expand All @@ -40,7 +44,7 @@ func Test_Upload_invoke(t *testing.T) {
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})

testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
defer testRetryableUploaderWrapper.Done()
testRetryableUploaderWrapper.Start(ctx)

// nil input - no call to Upload()
err := testRetryableUploaderWrapper.Upload(nil)
Expand All @@ -58,6 +62,8 @@ func Test_Upload_invoke(t *testing.T) {
}

func Test_RetryUpload(t *testing.T) {
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
uploaderCalled := false
Expand All @@ -72,7 +78,7 @@ func Test_RetryUpload(t *testing.T) {
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})

testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
defer testRetryableUploaderWrapper.Done()
testRetryableUploaderWrapper.Start(ctx)

err := testRetryableUploaderWrapper.RetryUpload()
wg.Wait()
Expand All @@ -82,6 +88,8 @@ func Test_RetryUpload(t *testing.T) {
}

func Test_AsyncUploaderCallback(t *testing.T) {
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
defer cancel()
wgUploadCalleded := sync.WaitGroup{}
wgUploadCalleded.Add(1)

Expand All @@ -95,7 +103,7 @@ func Test_AsyncUploaderCallback(t *testing.T) {
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})

testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
defer testRetryableUploaderWrapper.Done()
testRetryableUploaderWrapper.Start(ctx)

testComputationResult := createTestComputationResult()
err := testRetryableUploaderWrapper.Upload(testComputationResult)
Expand Down
99 changes: 63 additions & 36 deletions engine/execution/ingestion/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/utils/logging"

"github.com/sethvargo/go-retry"
Expand All @@ -26,74 +27,100 @@ func NewAsyncUploader(uploader Uploader,
maxRetryNumber uint64,
log zerolog.Logger,
metrics module.ExecutionMetrics) *AsyncUploader {
return &AsyncUploader{
unit: engine.NewUnit(),
a := &AsyncUploader{
uploader: uploader,
log: log.With().Str("component", "block_data_uploader").Logger(),
metrics: metrics,
retryInitialTimeout: retryInitialTimeout,
maxRetryNumber: maxRetryNumber,
queue: make(chan *execution.ComputationResult, 100),
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
}
builder := component.NewComponentManagerBuilder()
for i := 0; i < 3; i++ {
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
builder.AddWorker(a.UploadWorker)
}
a.cm = builder.Build()
a.Component = a.cm
return a
}

// AsyncUploader wraps up another Uploader instance and make its upload asynchronous
type AsyncUploader struct {
module.ReadyDoneAware
unit *engine.Unit
uploader Uploader
log zerolog.Logger
metrics module.ExecutionMetrics
retryInitialTimeout time.Duration
maxRetryNumber uint64
onComplete OnCompleteFunc // callback function called after Upload is completed
queue chan *execution.ComputationResult
cm *component.ComponentManager
component.Component
}

func (a *AsyncUploader) Ready() <-chan struct{} {
return a.unit.Ready()
}

func (a *AsyncUploader) Done() <-chan struct{} {
return a.unit.Done()
// UploadWorker implements a component worker which asynchronously uploads computation results
// from the execution node (after a block is executed) to storage such as a GCP bucket or S3 bucket.
func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

done := ctx.Done()
for {
select {
case <-done:
return
case computationResult := <-a.queue:
a.UploadTask(ctx, computationResult)
}
}
}

func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) {
a.onComplete = onComplete
}

// Upload adds the computation result to a queue to be processed asynchronously by workers,
// ensuring that multiple uploads can be run in parallel.
// No errors expected during normal operation.
func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error {
a.queue <- computationResult
return nil
}

// UploadTask implements retrying for uploading computation results.
// When the upload is complete, the callback will be called with the result (for example,
// to record that the upload was successful) and any error.
// No errors expected during normal operation.
func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execution.ComputationResult) {
backoff := retry.NewFibonacci(a.retryInitialTimeout)
backoff = retry.WithMaxRetries(a.maxRetryNumber, backoff)

a.unit.Launch(func() {
a.metrics.ExecutionBlockDataUploadStarted()
start := time.Now()

a.log.Debug().Msgf("computation result of block %s is being uploaded",
computationResult.ExecutableBlock.ID().String())
a.metrics.ExecutionBlockDataUploadStarted()
start := time.Now()

err := retry.Do(a.unit.Ctx(), backoff, func(ctx context.Context) error {
err := a.uploader.Upload(computationResult)
if err != nil {
a.log.Warn().Err(err).Msg("error while uploading block data, retrying")
}
return retry.RetryableError(err)
})
a.log.Debug().Msgf("computation result of block %s is being uploaded",
computationResult.ExecutableBlock.ID().String())

err := retry.Do(ctx, backoff, func(ctx context.Context) error {
err := a.uploader.Upload(computationResult)
if err != nil {
a.log.Error().Err(err).
Hex("block_id", logging.Entity(computationResult.ExecutableBlock)).
Msg("failed to upload block data")
} else {
a.log.Debug().Msgf("computation result of block %s was successfully uploaded",
computationResult.ExecutableBlock.ID().String())
a.log.Warn().Err(err).Msg("error while uploading block data, retrying")
}
return retry.RetryableError(err)
})

a.metrics.ExecutionBlockDataUploadFinished(time.Since(start))
// We only log upload errors here because the errors originate from an external cloud provider
// and the upload success is not critical to correct continued operation of the node
if err != nil {
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
a.log.Error().Err(err).
Hex("block_id", logging.Entity(computationResult.ExecutableBlock)).
Msg("failed to upload block data")
} else {
a.log.Debug().Msgf("computation result of block %s was successfully uploaded",
computationResult.ExecutableBlock.ID().String())
}

if a.onComplete != nil {
a.onComplete(computationResult, err)
}
})
return nil
a.metrics.ExecutionBlockDataUploadFinished(time.Since(start))

if a.onComplete != nil {
a.onComplete(computationResult, err)
}
}
31 changes: 22 additions & 9 deletions engine/execution/ingestion/uploader/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package uploader

import (
"bytes"
"context"
"fmt"
"runtime/debug"
"sync"
Expand All @@ -13,17 +14,17 @@ import (
"go.uber.org/atomic"

"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/engine/execution/state/unittest"
exeunittest "github.com/onflow/flow-go/engine/execution/state/unittest"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
testutils "github.com/onflow/flow-go/utils/unittest"
unittest2 "github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest"
)

func Test_AsyncUploader(t *testing.T) {

computationResult := unittest.ComputationResultFixture(
computationResult := exeunittest.ComputationResultFixture(
t,
testutils.IdentifierFixture(),
unittest.IdentifierFixture(),
nil)

t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) {
Expand All @@ -46,6 +47,8 @@ func Test_AsyncUploader(t *testing.T) {

metrics := &DummyCollector{}
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 1, zerolog.Nop(), metrics)
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
async.Start(ctx)

err := async.Upload(computationResult)
require.NoError(t, err)
Expand All @@ -63,6 +66,8 @@ func Test_AsyncUploader(t *testing.T) {
wgContinueUpload.Done() //release all

// shut down component
cancel()
unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader did not finish in time")
<-async.Done()

require.Equal(t, int64(0), metrics.Counter.Load())
Expand All @@ -89,6 +94,9 @@ func Test_AsyncUploader(t *testing.T) {
}

async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{})
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
async.Start(ctx)
defer cancel()

err := async.Upload(computationResult)
require.NoError(t, err)
Expand All @@ -107,7 +115,7 @@ func Test_AsyncUploader(t *testing.T) {
// 2. shut down async uploader right after upload initiated (not completed)
// 3. assert that upload called only once even when trying to use retry mechanism
t.Run("stopping component stops retrying", func(t *testing.T) {
testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky")
unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky")

callCount := 0
t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1]))
Expand Down Expand Up @@ -151,6 +159,8 @@ func Test_AsyncUploader(t *testing.T) {
}
t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1]))
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{})
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
async.Start(ctx)
t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1]))
err := async.Upload(computationResult) // doesn't matter what we upload
require.NoError(t, err)
Expand All @@ -163,11 +173,11 @@ func Test_AsyncUploader(t *testing.T) {

// stop component and check that it's fully stopped
t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1]))
c := async.Done()
cancel()
t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1]))
wgShutdownStarted.Done()
t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1]))
unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time")
unittest.RequireCloseBefore(t, async.Done(), 1*time.Second, "async uploader not closed in time")

t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1]))
require.Equal(t, 1, callCount)
Expand All @@ -190,12 +200,15 @@ func Test_AsyncUploader(t *testing.T) {
async.SetOnCompleteCallback(func(computationResult *execution.ComputationResult, err error) {
onCompleteCallbackCalled = true
})
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
async.Start(ctx)

err := async.Upload(computationResult)
require.NoError(t, err)

wgUploadCalleded.Wait()
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
<-async.Done()
cancel()
unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time")

require.True(t, onCompleteCallbackCalled)
})
Expand Down
18 changes: 17 additions & 1 deletion module/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,25 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down")
// channels that close when startup and shutdown have completed.
// Once Start has been called, the channel returned by Done must close eventually,
// whether that be because of a graceful shutdown or an irrecoverable error.
// See also ComponentManager below.
type Component interface {
module.Startable
module.ReadyDoneAware
// Ready returns a ready channel that is closed once startup has completed.
// Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component,
// and only exposes information about the component's state.
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
// To start the component, instead use the Start() method.
// Note that the ready channel may never close if errors are encountered during startup,
// or if shutdown has already commenced before startup is complete.
// This should be an idempotent method.
Ready() <-chan struct{}

// Done returns a done channel that is closed once shutdown has completed.
// Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component,
// and only exposes information about the component's state.
// To shutdown the component, instead cancel the context that was passed to Start().
// Implementations must close the done channel even if errors are encountered during shutdown.
// This should be an idempotent method.
Done() <-chan struct{}
}

type ComponentFactory func() (Component, error)
Expand Down
Loading