Skip to content

Commit

Permalink
Sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
k-karuna committed Dec 17, 2024
1 parent 5582148 commit 03a4417
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 19 deletions.
14 changes: 13 additions & 1 deletion pkg/indexer/sequencer/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sequencer

import (
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api"
)

func (s *Module) listen(ctx context.Context) {
Expand All @@ -13,12 +14,23 @@ func (s *Module) listen(ctx context.Context) {
select {
case <-ctx.Done():
return
case _, ok := <-input.Listen():
case msg, ok := <-input.Listen():
if !ok {
s.Log.Warn().Msg("can't read message from input, it was drained and closed")
s.MustOutput(StopOutput).Push(struct{}{})
return
}
block, ok := msg.(*api.SqdBlockResponse)
if !ok {
s.Log.Warn().Msgf("invalid message type: %T", msg)
continue
}

s.buffer[block.Header.Number] = block

s.Log.Info().
Uint64("ID", block.Header.Number).
Msg("received block")
}
}
}
11 changes: 9 additions & 2 deletions pkg/indexer/sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ package sequencer

import (
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"sync"
)

type Module struct {
modules.BaseModule
input <-chan *api.SqdBlockResponse

Check failure on line 12 in pkg/indexer/sequencer/sequencer.go

View workflow job for this annotation

GitHub Actions / Linter

field `input` is unused (unused)
output chan *api.SqdBlockResponse

Check failure on line 13 in pkg/indexer/sequencer/sequencer.go

View workflow job for this annotation

GitHub Actions / Linter

field `output` is unused (unused)
buffer map[uint64]*api.SqdBlockResponse
nextNumber uint64

Check failure on line 15 in pkg/indexer/sequencer/sequencer.go

View workflow job for this annotation

GitHub Actions / Linter

field `nextNumber` is unused (unused)
mu sync.Mutex

Check failure on line 16 in pkg/indexer/sequencer/sequencer.go

View workflow job for this annotation

GitHub Actions / Linter

field `mu` is unused (unused)
}

var _ modules.Module = (*Module)(nil)
Expand All @@ -17,15 +24,15 @@ const (
StopOutput = "stop"
)

func New() Module {
func New() *Module {
m := Module{
BaseModule: modules.New("sequencer"),
}
m.CreateInputWithCapacity(InputName, 128)
m.CreateOutput(OutputName)
m.CreateOutput(StopOutput)

return m
return &m
}

func (s *Module) Start(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/indexer/sqd_receiver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func (s *Subsquid) GetWorkerUrl(_ context.Context, startLevel uint64) (string, e
return response.Body().AsString()
}

func (s *Subsquid) GetBlocks(_ context.Context, from, to uint64, workerUrl string) ([]*SqdBlockResponse, error) {
func (s *Subsquid) GetBlocks(_ context.Context, from uint64, workerUrl string) ([]*SqdBlockResponse, error) {
var workerClient = fastshot.NewClient(workerUrl).
Build()

response, err := workerClient.POST("").
Header().AddContentType(mime.JSON).
Body().AsJSON(NewRequest(from, to)).
Body().AsJSON(NewRequest(from)).
Send()

if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/indexer/sqd_receiver/api/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
type Request struct {
Type string `json:"type"`
FromBlock uint64 `json:"fromBlock"`
ToBlock uint64 `json:"toBlock,omitempty"`
IncludeAllBlocks bool `json:"includeAllBlocks"`
Fields Fields `json:"fields,omitempty"`
StateUpdates []map[string]any `json:"stateUpdates,omitempty"`
Expand Down Expand Up @@ -90,11 +89,10 @@ type TransactionWithTrace struct {
Events bool `json:"events"`
}

func NewRequest(fromLevel, toLevel uint64) *Request {
func NewRequest(fromLevel uint64) *Request {
return &Request{
Type: "starknet",
FromBlock: fromLevel,
ToBlock: toLevel,
IncludeAllBlocks: true,
Fields: Fields{
Block: BlockField{
Expand Down
38 changes: 36 additions & 2 deletions pkg/indexer/sqd_receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ type BlocksToWorker struct {

type GetIndexerHeight func() uint64

const (
OutputName = "blocks"
StopOutput = "stop"
)

type Receiver struct {
modules.BaseModule
api *api.Subsquid
startLevel uint64
level uint64
threadsCount int
blocks chan *api.SqdBlockResponse
getIndexerHeight GetIndexerHeight
pool *workerpool.Pool[BlocksToWorker]
processing map[uint64]struct{}
Expand All @@ -37,6 +44,7 @@ type Receiver struct {
log zerolog.Logger
timeout time.Duration
wg *sync.WaitGroup
mx *sync.RWMutex
}

// New -
Expand All @@ -54,30 +62,42 @@ func New(cfg config.Config,
receiver := &Receiver{
BaseModule: modules.New("subsquid receiver"),
startLevel: startLevel,
threadsCount: threadsCount,
getIndexerHeight: getIndexerHeight,
threadsCount: threadsCount,
api: api.NewSubsquid(dsCfg),
blocks: make(chan *api.SqdBlockResponse, cfg.ThreadsCount*10),
processing: make(map[uint64]struct{}),
processingMx: new(sync.Mutex),
log: log.With().Str("module", "subsquid_receiver").Logger(),
timeout: time.Duration(cfg.Timeout) * time.Second,
wg: new(sync.WaitGroup),
mx: new(sync.RWMutex),
}

if receiver.timeout == 0 {
receiver.timeout = 10 * time.Second
}

receiver.CreateOutput(OutputName)
receiver.CreateOutput(StopOutput)

receiver.pool = workerpool.NewPool(receiver.worker, cfg.ThreadsCount)
return receiver, nil
}

// Start -
func (r *Receiver) Start(ctx context.Context) {
r.log.Info().Msg("starting subsquid receiver...")
r.pool.Start(ctx)
level := r.getIndexerHeight()
if r.startLevel > level {
level = r.startLevel
}

r.setLevel(level)

r.pool.Start(ctx)
r.G.GoCtx(ctx, r.sync)
r.G.GoCtx(ctx, r.sequencer)
}

// Close -
Expand Down Expand Up @@ -166,3 +186,17 @@ func (r *Receiver) GetSqdWorkerRanges(ctx context.Context, fromLevel, height uin

return result, nil
}

func (r *Receiver) Level() uint64 {
r.mx.RLock()
defer r.mx.RUnlock()

return r.level
}

func (r *Receiver) setLevel(level uint64) {
r.mx.Lock()
defer r.mx.Unlock()

r.level = level
}
44 changes: 44 additions & 0 deletions pkg/indexer/sqd_receiver/sequencer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sqd_receiver

import (
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api"
)

func (r *Receiver) sequencer(ctx context.Context) {
orderedBlocks := map[uint64]*api.SqdBlockResponse{}
l := r.Level()
currentBlock := l + 1

for {
select {
case <-ctx.Done():
return
case block, ok := <-r.blocks:
if !ok {
r.Log.Warn().Msg("can't read message from input, it was drained and closed")
r.MustOutput(StopOutput).Push(struct{}{})
return
}
orderedBlocks[block.Header.Number] = block

b, ok := orderedBlocks[currentBlock]
for ok {
r.MustOutput(OutputName).Push(b)
r.Log.Info().
Uint64("ID", b.Header.Number).
Msg("sended block")

r.setLevel(currentBlock)
r.Log.Debug().
Uint64("height", currentBlock).
Msg("put in order block")

delete(orderedBlocks, currentBlock)
currentBlock += 1

b, ok = orderedBlocks[currentBlock]
}
}
}
}
14 changes: 5 additions & 9 deletions pkg/indexer/sqd_receiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) {
Str("URL", blockRange.WorkerURL).
Msg("worker handling sqd worker...")

// todo: move to config
var batchSize uint64 = 1000
from := blockRange.From
to := blockRange.From + batchSize

for {
select {
case <-ctx.Done():
Expand All @@ -25,7 +21,7 @@ func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) {
// log.Info().Msg("stop receiving blocks")
// return
default:
blocks, err := r.api.GetBlocks(ctx, from, to, blockRange.WorkerURL)
blocks, err := r.api.GetBlocks(ctx, from, blockRange.WorkerURL)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand All @@ -38,12 +34,12 @@ func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) {
if lastBlock.Header.Number == blockRange.To {
break
}

from = lastBlock.Header.Number + 1
to = from + batchSize
if to > blockRange.To {
to = blockRange.To

for _, block := range blocks {
r.blocks <- block
}

r.log.Info().
Uint64("From", blocks[0].Header.Number).
Uint64("To", lastBlock.Header.Number).
Expand Down

0 comments on commit 03a4417

Please sign in to comment.