diff --git a/pkg/indexer/sequencer/listen.go b/pkg/indexer/sequencer/listen.go deleted file mode 100644 index 5688ec3..0000000 --- a/pkg/indexer/sequencer/listen.go +++ /dev/null @@ -1,36 +0,0 @@ -package sequencer - -import ( - "context" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api" -) - -func (s *Module) listen(ctx context.Context) { - s.Log.Info().Msg("module started") - - input := s.MustInput(InputName) - - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-input.Listen(): - if !ok { - s.Log.Warn().Msg("can't read message from input, it was drained and closed") - s.MustOutput(StopOutput).Push(struct{}{}) - return - } - block, ok := msg.(*api.SqdBlockResponse) - if !ok { - s.Log.Warn().Msgf("invalid message type: %T", msg) - continue - } - - s.buffer[block.Header.Number] = block - - s.Log.Info(). - Uint64("ID", block.Header.Number). - Msg("received block") - } - } -} diff --git a/pkg/indexer/sequencer/sequencer.go b/pkg/indexer/sequencer/sequencer.go deleted file mode 100644 index baacbc1..0000000 --- a/pkg/indexer/sequencer/sequencer.go +++ /dev/null @@ -1,47 +0,0 @@ -package sequencer - -import ( - "context" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "sync" -) - -type Module struct { - modules.BaseModule - input <-chan *api.SqdBlockResponse - output chan *api.SqdBlockResponse - buffer map[uint64]*api.SqdBlockResponse - nextNumber uint64 - mu sync.Mutex -} - -var _ modules.Module = (*Module)(nil) - -const ( - InputName = "blocks_input" - OutputName = "blocks_output" - StopOutput = "stop" -) - -func New() *Module { - m := Module{ - BaseModule: modules.New("sequencer"), - } - m.CreateInputWithCapacity(InputName, 128) - m.CreateOutput(OutputName) - m.CreateOutput(StopOutput) - - return &m -} - -func (s *Module) Start(ctx context.Context) { - s.Log.Info().Msg("starting...") - s.G.GoCtx(ctx, s.listen) -} - -func (s *Module) Close() error { - s.Log.Info().Msg("closing...") - s.G.Wait() - return nil -} diff --git a/pkg/indexer/sqd_receiver/api/api.go b/pkg/indexer/sqd_receiver/api/api.go index 7339bb6..ada70b4 100644 --- a/pkg/indexer/sqd_receiver/api/api.go +++ b/pkg/indexer/sqd_receiver/api/api.go @@ -35,13 +35,13 @@ func (s *Subsquid) GetWorkerUrl(_ context.Context, startLevel uint64) (string, e return response.Body().AsString() } -func (s *Subsquid) GetBlocks(_ context.Context, from uint64, workerUrl string) ([]*SqdBlockResponse, error) { +func (s *Subsquid) GetBlocks(_ context.Context, from, to uint64, workerUrl string) ([]*SqdBlockResponse, error) { var workerClient = fastshot.NewClient(workerUrl). Build() response, err := workerClient.POST(""). Header().AddContentType(mime.JSON). - Body().AsJSON(NewRequest(from)). + Body().AsJSON(NewRequest(from, to)). Send() if err != nil { diff --git a/pkg/indexer/sqd_receiver/api/request.go b/pkg/indexer/sqd_receiver/api/request.go index df365a8..5d3dfef 100644 --- a/pkg/indexer/sqd_receiver/api/request.go +++ b/pkg/indexer/sqd_receiver/api/request.go @@ -3,6 +3,7 @@ package api type Request struct { Type string `json:"type"` FromBlock uint64 `json:"fromBlock"` + ToBlock uint64 `json:"toBlock,omitempty"` IncludeAllBlocks bool `json:"includeAllBlocks"` Fields Fields `json:"fields,omitempty"` StateUpdates []map[string]any `json:"stateUpdates,omitempty"` @@ -89,10 +90,11 @@ type TransactionWithTrace struct { Events bool `json:"events"` } -func NewRequest(fromLevel uint64) *Request { +func NewRequest(fromLevel uint64, toLevel uint64) *Request { return &Request{ Type: "starknet", FromBlock: fromLevel, + ToBlock: toLevel, IncludeAllBlocks: true, Fields: Fields{ Block: BlockField{ diff --git a/pkg/indexer/sqd_receiver/receiver.go b/pkg/indexer/sqd_receiver/receiver.go index 9e7c2e7..2cce229 100644 --- a/pkg/indexer/sqd_receiver/receiver.go +++ b/pkg/indexer/sqd_receiver/receiver.go @@ -119,7 +119,7 @@ func (r *Receiver) checkQueue(ctx context.Context) bool { case <-ctx.Done(): return true default: - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 10) } } @@ -187,6 +187,28 @@ func (r *Receiver) GetSqdWorkerRanges(ctx context.Context, fromLevel, height uin return result, nil } +func (r *Receiver) SplitWorkerRanger(workerRanges []BlocksToWorker) []BlocksToWorker { + var result []BlocksToWorker + batchSize := uint64(200) + + for _, worker := range workerRanges { + for start := worker.From; start <= worker.To; start += batchSize { + end := start + batchSize - 1 + if end > worker.To { + end = worker.To + } + + result = append(result, BlocksToWorker{ + From: start, + To: end, + WorkerURL: worker.WorkerURL, + }) + } + } + + return result +} + func (r *Receiver) Level() uint64 { r.mx.RLock() defer r.mx.RUnlock() diff --git a/pkg/indexer/sqd_receiver/sequencer.go b/pkg/indexer/sqd_receiver/sequencer.go index 9074b2f..cca3926 100644 --- a/pkg/indexer/sqd_receiver/sequencer.go +++ b/pkg/indexer/sqd_receiver/sequencer.go @@ -25,15 +25,7 @@ func (r *Receiver) sequencer(ctx context.Context) { b, ok := orderedBlocks[currentBlock] for ok { r.MustOutput(OutputName).Push(b) - r.Log.Info(). - Uint64("ID", b.Header.Number). - Msg("sended block") - r.setLevel(currentBlock) - r.Log.Debug(). - Uint64("height", currentBlock). - Msg("put in order block") - delete(orderedBlocks, currentBlock) currentBlock += 1 diff --git a/pkg/indexer/sqd_receiver/sync.go b/pkg/indexer/sqd_receiver/sync.go index 58fa831..ed3c5ff 100644 --- a/pkg/indexer/sqd_receiver/sync.go +++ b/pkg/indexer/sqd_receiver/sync.go @@ -16,11 +16,6 @@ func (r *Receiver) sync(ctx context.Context) { Uint64("indexer_height", r.getIndexerHeight()). Uint64("node_height", head). Msg("rollback detected by block height") - - // todo: makeRollback - //if err := f.indexer.makeRollback(ctx, head); err != nil { - // return errors.Wrap(err, "makeRollback") - //} } r.log.Info(). @@ -41,14 +36,10 @@ func (r *Receiver) sync(ctx context.Context) { return } - for _, blockRange := range blocksToWorker { + for _, blockRange := range r.SplitWorkerRanger(blocksToWorker) { select { case <-ctx.Done(): return - // todo: f.indexer.rollback - //case <-f.indexer.rollback: - // log.Info().Msg("stop receiving blocks") - // return nil default: if r.checkQueue(ctx) { return diff --git a/pkg/indexer/sqd_receiver/worker.go b/pkg/indexer/sqd_receiver/worker.go index 388659e..f81f54d 100644 --- a/pkg/indexer/sqd_receiver/worker.go +++ b/pkg/indexer/sqd_receiver/worker.go @@ -2,49 +2,39 @@ package sqd_receiver import ( "context" - "github.com/pkg/errors" - "time" + "github.com/rs/zerolog/log" ) func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) { - r.log.Info(). - Str("URL", blockRange.WorkerURL). - Msg("worker handling sqd worker...") - from := blockRange.From - for { + var allBlocksDownloaded bool + + for !allBlocksDownloaded { select { case <-ctx.Done(): return - // todo: indexer.rollback - //case <-f.indexer.rollback: - // log.Info().Msg("stop receiving blocks") - // return default: - blocks, err := r.api.GetBlocks(ctx, from, blockRange.WorkerURL) + blocks, err := r.api.GetBlocks(ctx, from, blockRange.To, blockRange.WorkerURL) if err != nil { - if errors.Is(err, context.Canceled) { - return - } - time.Sleep(time.Second) - continue + log.Err(err). + Uint64("fromLevel", from). + Uint64("toLevel", blockRange.To). + Str("worker url", blockRange.WorkerURL). + Msg("loading blocks error") + return } lastBlock := blocks[len(blocks)-1] - if lastBlock.Header.Number == blockRange.To { - break - } - from = lastBlock.Header.Number + 1 for _, block := range blocks { r.blocks <- block } - r.log.Info(). - Uint64("From", blocks[0].Header.Number). - Uint64("To", lastBlock.Header.Number). - Msg("worker received blocks") + if lastBlock.Header.Number == blockRange.To { + allBlocksDownloaded = true + } else { + from = lastBlock.Header.Number + 1 + } } } - }