Skip to content

Commit

Permalink
Refactoring: receiver (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Sep 8, 2024
1 parent 92d8835 commit 2fb3012
Showing 1 changed file with 124 additions and 89 deletions.
213 changes: 124 additions & 89 deletions pkg/indexer/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ type Result struct {
Block Block
Traces []starknet.Trace
StateUpdate starknetData.StateUpdate

mx *sync.Mutex
}

func NewResult() Result {
return Result{
mx: new(sync.Mutex),
}
}

func (r *Result) setBlock(block Block) {
r.mx.Lock()
{
r.Block = block
}
r.mx.Unlock()
}

func (r *Result) setTraces(traces []starknet.Trace) {
r.mx.Lock()
{
r.Traces = traces
}
r.mx.Unlock()
}

func (r *Result) setStateUpdates(stateUpdate starknetData.StateUpdate) {
r.mx.Lock()
{
r.StateUpdate = stateUpdate
}
r.mx.Unlock()
}

// Receiver -
Expand Down Expand Up @@ -100,100 +132,16 @@ func (r *Receiver) worker(ctx context.Context, height uint64) {
var (
result Result
wg sync.WaitGroup
mx = new(sync.Mutex)
)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
default:
}

response, err := r.api.GetBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", height).Msg("get block request")
time.Sleep(time.Second)
continue
}
mx.Lock()
{
result.Block = response
}
mx.Unlock()
break
}
}(mx, &wg)
go r.getBlock(ctx, blockId, &result, &wg)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

api := r.api
for {
select {
case <-ctx.Done():
return
default:
}

response, err := api.TraceBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", height).Msg("get block traces request")
time.Sleep(time.Second)
r.log.Warn().Msg("trying fallback node...")
api = r.fallbackAPI
continue
}

mx.Lock()
{
result.Traces = response
}
mx.Unlock()
break
}
}(mx, &wg)
go r.traceBlock(ctx, blockId, &result, &wg)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
default:
}

response, err := r.getStateUpdate(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", height).Msg("state update request")
time.Sleep(time.Second)
continue
}

mx.Lock()
{
result.StateUpdate = response
}
mx.Unlock()
break
}
}(mx, &wg)
go r.receiveStateUpdate(ctx, blockId, &result, &wg)

wg.Wait()

Expand Down Expand Up @@ -261,11 +209,11 @@ func (r *Receiver) QueueSize() int {
return r.pool.QueueSize()
}

func (r *Receiver) getStateUpdate(ctx context.Context, blockId starknetData.BlockID) (starknetData.StateUpdate, error) {
func (r *Receiver) getStateUpdate(ctx context.Context, api API, blockId starknetData.BlockID) (starknetData.StateUpdate, error) {
requestCtx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()

return r.api.GetStateUpdate(requestCtx, blockId)
return api.GetStateUpdate(requestCtx, blockId)
}

func (r *Receiver) Clear() {
Expand All @@ -278,3 +226,90 @@ func (r *Receiver) Clear() {
delete(r.processing, key)
}
}

func (r *Receiver) getBlock(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) {
defer wg.Done()

api := r.api
for {
select {
case <-ctx.Done():
return
default:
}

response, err := api.GetBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", *blockId.Number).Msg("get block request")
if r.fallbackAPI != nil {
r.log.Warn().Msg("trying fallback node...")
api = r.fallbackAPI
}
time.Sleep(time.Second)
continue
}
result.setBlock(response)
break
}
}

func (r *Receiver) traceBlock(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) {
defer wg.Done()

api := r.api
for {
select {
case <-ctx.Done():
return
default:
}

response, err := api.TraceBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", *blockId.Number).Msg("get block traces request")
if r.fallbackAPI != nil {
r.log.Warn().Msg("trying fallback node...")
api = r.fallbackAPI
}
time.Sleep(time.Second)
continue
}
result.setTraces(response)
break
}
}

func (r *Receiver) receiveStateUpdate(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) {
defer wg.Done()

api := r.api
for {
select {
case <-ctx.Done():
return
default:
}

response, err := r.getStateUpdate(ctx, api, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Uint64("height", *blockId.Number).Msg("state update request")
if r.fallbackAPI != nil {
r.log.Warn().Msg("trying fallback node...")
api = r.fallbackAPI
}
time.Sleep(time.Second)
continue
}
result.setStateUpdates(response)
break
}
}

0 comments on commit 2fb3012

Please sign in to comment.