Skip to content

Commit

Permalink
Refactor BlockPoller to support generics and client handling
Browse files Browse the repository at this point in the history
Refactor BlockPoller to support generic clients with type parameter 'C'. Updated associated methods, test functions, and interfaces to accommodate this change. This enhancement includes moving the retry mechanism and client handling to the poller, improving flexibility for different client types.
  • Loading branch information
billettc committed Nov 14, 2024
1 parent 06b587c commit 2901a03
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 81 deletions.
6 changes: 2 additions & 4 deletions blockpoller/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package blockpoller

import (
"context"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
)

type BlockFetcher interface {
type BlockFetcher[C any] interface {
IsBlockAvailable(requestedSlot uint64) bool
Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
Fetch(client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
}
17 changes: 8 additions & 9 deletions blockpoller/init_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockpoller

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -27,31 +26,31 @@ type TestBlock struct {
send *pbbstream.Block
}

var _ BlockFetcher = &TestBlockFetcher{}
var _ BlockFetcher[any] = &TestBlockFetcher[any]{}

type TestBlockFetcher struct {
type TestBlockFetcher[C any] struct {
t *testing.T
blocks []*TestBlock
idx uint64
completed bool
}

func newTestBlockFetcher(t *testing.T, blocks []*TestBlock) *TestBlockFetcher {
return &TestBlockFetcher{
func newTestBlockFetcher[C any](t *testing.T, blocks []*TestBlock) *TestBlockFetcher[C] {
return &TestBlockFetcher[C]{
t: t,
blocks: blocks,
}
}

func (b *TestBlockFetcher) PollingInterval() time.Duration {
func (b *TestBlockFetcher[C]) PollingInterval() time.Duration {
return 0
}

func (b *TestBlockFetcher) IsBlockAvailable(requestedSlot uint64) bool {
func (b *TestBlockFetcher[C]) IsBlockAvailable(requestedSlot uint64) bool {
return true
}

func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) {
func (b *TestBlockFetcher[C]) Fetch(c C, blkNum uint64) (*pbbstream.Block, bool, error) {
if len(b.blocks) == 0 {
assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum))
}
Expand All @@ -69,7 +68,7 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B
return blkToSend, false, nil
}

func (b *TestBlockFetcher) check(t *testing.T) {
func (b *TestBlockFetcher[C]) check(t *testing.T) {
t.Helper()
require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx)
}
Expand Down
18 changes: 9 additions & 9 deletions blockpoller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ package blockpoller

import "go.uber.org/zap"

type Option func(*BlockPoller)
type Option[C any] func(*BlockPoller[C])

func WithBlockFetchRetryCount(v uint64) Option {
return func(p *BlockPoller) {
func WithBlockFetchRetryCount[C any](v uint64) Option[C] {
return func(p *BlockPoller[C]) {
p.fetchBlockRetryCount = v
}
}

func WithStoringState(stateStorePath string) Option {
return func(p *BlockPoller) {
func WithStoringState[C any](stateStorePath string) Option[C] {
return func(p *BlockPoller[C]) {
p.stateStorePath = stateStorePath
}
}

// IgnoreCursor ensures the poller will ignore the cursor and start from the startBlockNum
// the cursor will still be saved as the poller progresses
func IgnoreCursor() Option {
return func(p *BlockPoller) {
func IgnoreCursor[C any]() Option[C] {
return func(p *BlockPoller[C]) {
p.ignoreCursor = true
}
}

func WithLogger(logger *zap.Logger) Option {
return func(p *BlockPoller) {
func WithLogger[C any](logger *zap.Logger) Option[C] {
return func(p *BlockPoller[C]) {
p.logger = logger
}
}
122 changes: 80 additions & 42 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/streamingfast/bstream"

"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
"github.com/streamingfast/dhammer"
"github.com/streamingfast/firehose-core/internal/utils"
"github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
Expand All @@ -27,17 +27,19 @@ func newBlock(block2 *pbbstream.Block) *block {
return &block{block2, false}
}

type BlockPoller struct {
type BlockPoller[C any] struct {
*shutter.Shutter
startBlockNumGate uint64
fetchBlockRetryCount uint64
stateStorePath string
ignoreCursor bool
forceFinalityAfterBlocks *uint64

blockFetcher BlockFetcher
blockFetcher BlockFetcher[C]
blockHandler BlockHandler
forkDB *forkable.ForkDB
clients *rpc.Clients[C]

forkDB *forkable.ForkDB

logger *zap.Logger

Expand All @@ -47,16 +49,18 @@ type BlockPoller struct {
optimisticallyPolledBlocksLock sync.Mutex
}

func New(
blockFetcher BlockFetcher,
func New[C any](
blockFetcher BlockFetcher[C],
blockHandler BlockHandler,
opts ...Option,
) *BlockPoller {
clients *rpc.Clients[C],
opts ...Option[C],
) *BlockPoller[C] {

b := &BlockPoller{
b := &BlockPoller[C]{
Shutter: shutter.New(),
blockFetcher: blockFetcher,
blockHandler: blockHandler,
clients: clients,
fetchBlockRetryCount: math.MaxUint64,
logger: zap.NewNop(),
forceFinalityAfterBlocks: utils.GetEnvForceFinalityAfterBlocks(),
Expand All @@ -69,7 +73,7 @@ func New(
return b
}

func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error {
func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = firstStreamableBlockNum
p.logger.Info("starting poller",
zap.Uint64("first_streamable_block", firstStreamableBlockNum),
Expand All @@ -83,14 +87,25 @@ func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, b
}
p.forkDB = forkDB

return p.run(resolvedStartBlock, blockFetchBatchSize)
resolveStopBlock := uint64(math.MaxUint64)
if stopBlock != nil {
resolveStopBlock = *stopBlock
}

return p.run(resolvedStartBlock, resolveStopBlock, blockFetchBatchSize)
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) {
func (p *BlockPoller[C]) run(resolvedStartBlock bstream.BlockRef, stopBlock uint64, blockFetchBatchSize int) (err error) {
currentCursor := &cursor{state: ContinuousSegState, logger: p.logger}
blockToFetch := resolvedStartBlock.Num()
var hashToFetch *string
for {

if blockToFetch >= stopBlock {
p.logger.Info("stop block reach", zap.Uint64("stop_block", stopBlock))
return nil
}

if p.IsTerminating() {
p.logger.Info("block poller is terminating")
}
Expand Down Expand Up @@ -133,7 +148,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSi
}
}

func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
func (p *BlockPoller[C]) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum))
if block.Number < p.forkDB.LIBNum() {
panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum()))
Expand Down Expand Up @@ -189,31 +204,40 @@ type BlockItem struct {
skipped bool
}

func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
func (p *BlockPoller[C]) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}
p.fetching = true

nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) {
var blockItem *BlockItem
err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch)
if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}
if skip {
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,

bi, err := rpc.WithClients(p.clients, func(client C) (*BlockItem, error) {
b, skipped, err := p.blockFetcher.Fetch(client, blockToFetch)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockToFetch, err)
}
return nil
}
//todo: add block to cache
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,

if skipped {
return &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,
}, nil
}

return &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block %d with retry : %w", blockToFetch, err)
}
blockItem = bi

return nil

})
Expand Down Expand Up @@ -272,7 +296,7 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch
return nil
}

func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
func (p *BlockPoller[C]) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
p.logger.Info("requesting block", zap.Uint64("block_num", blockNumber))
requestedBlock := make(chan *BlockItem)

Expand Down Expand Up @@ -314,24 +338,38 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int)
return requestedBlock
}

func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
type FetchResponse struct {
Block *pbbstream.Block
Skipped bool
}

func (p *BlockPoller[C]) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
p.logger.Info("fetching block with hash", zap.Uint64("block_num", blkNum), zap.String("hash", hash))
_ = hash //todo: hash will be used to fetch block from cache

p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}

var out *pbbstream.Block
var skipped bool

err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
//todo: get block from cache
var fetchErr error
out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum)
if fetchErr != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr)
}
if skipped {
return nil
br, err := rpc.WithClients(p.clients, func(client C) (br *FetchResponse, err error) {
b, skipped, err := p.blockFetcher.Fetch(client, blkNum)
if err != nil {
return nil, fmt.Errorf("fetching block block %d: %w", blkNum, err)
}
return &FetchResponse{
Block: b,
Skipped: skipped,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block with retry %d: %w", blkNum, err)
}

out = br.Block
skipped = br.Skipped
return nil
})

Expand All @@ -350,7 +388,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream
return out, nil
}

func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
func (p *BlockPoller[C]) fireCompleteSegment(blocks []*forkable.Block) error {
for _, blk := range blocks {
b := blk.Object.(*block)
if _, err := p.fire(b); err != nil {
Expand All @@ -360,7 +398,7 @@ func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
return nil
}

func (p *BlockPoller) fire(blk *block) (bool, error) {
func (p *BlockPoller[C]) fire(blk *block) (bool, error) {
if blk.fired {
return false, nil
}
Expand Down
Loading

0 comments on commit 2901a03

Please sign in to comment.