Skip to content

Commit

Permalink
feat(eventindexer): eventindexer post ontake fork (#18474)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Nov 14, 2024
1 parent 8d1f9ea commit 83b6f15
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 3 deletions.
8 changes: 8 additions & 0 deletions packages/eventindexer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var (
Category: indexerCategory,
EnvVars: []string{"INDEX_ERC20S"},
}
OntakeForkHeight = &cli.Uint64Flag{
Name: "ontakeForkHeight",
Usage: "Block number ontake fork height happened",
Value: 21134698,
Category: indexerCategory,
EnvVars: []string{"ONTAKE_FORK_HEIGHT"},
}
)

var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{
Expand All @@ -87,4 +94,5 @@ var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{
SyncMode,
IndexNFTs,
IndexERC20s,
OntakeForkHeight,
})
2 changes: 2 additions & 0 deletions packages/eventindexer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
IndexNFTs bool
IndexERC20s bool
Layer string
OntakeForkHeight uint64
OpenDBFunc func() (db.DB, error)
}

Expand All @@ -55,6 +56,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
IndexNFTs: c.Bool(flags.IndexNFTs.Name),
IndexERC20s: c.Bool(flags.IndexERC20s.Name),
Layer: c.String(flags.Layer.Name),
OntakeForkHeight: c.Uint64(flags.OntakeForkHeight.Name),
OpenDBFunc: func() (db.DB, error) {
return db.OpenDBConnection(db.DBConnectionOpts{
Name: c.String(flags.DatabaseUsername.Name),
Expand Down
136 changes: 134 additions & 2 deletions packages/eventindexer/indexer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,112 @@ func filterFunc(
return nil
}

func filterFuncOntake(
ctx context.Context,
chainID *big.Int,
i *Indexer,
filterOpts *bind.FilterOpts,
) error {
wg, ctx := errgroup.WithContext(ctx)

if i.taikol1 != nil {
wg.Go(func() error {
transitionProvedEvents, err := i.taikol1.FilterTransitionProvedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterTransitionProved")
}

err = i.saveTransitionProvedEventsV2(ctx, chainID, transitionProvedEvents)
if err != nil {
return errors.Wrap(err, "i.saveTransitionProvedEvents")
}

return nil
})

wg.Go(func() error {
transitionContestedEvents, err := i.taikol1.FilterTransitionContestedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterTransitionContested")
}

err = i.saveTransitionContestedEventsV2(ctx, chainID, transitionContestedEvents)
if err != nil {
return errors.Wrap(err, "i.saveTransitionContestedEvents")
}

return nil
})

wg.Go(func() error {
blockProposedEvents, err := i.taikol1.FilterBlockProposedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterBlockProposed")
}

err = i.saveBlockProposedEventsV2(ctx, chainID, blockProposedEvents)
if err != nil {
return errors.Wrap(err, "i.saveBlockProposedEvents")
}

return nil
})

wg.Go(func() error {
blockVerifiedEvents, err := i.taikol1.FilterBlockVerifiedV2(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterBlockVerified")
}

err = i.saveBlockVerifiedEventsV2(ctx, chainID, blockVerifiedEvents)
if err != nil {
return errors.Wrap(err, "i.saveBlockVerifiedEvents")
}

return nil
})
}

if i.bridge != nil {
wg.Go(func() error {
messagesSent, err := i.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.bridge.FilterMessageSent")
}

err = i.saveMessageSentEvents(ctx, chainID, messagesSent)
if err != nil {
return errors.Wrap(err, "i.saveMessageSentEvents")
}

return nil
})
}

wg.Go(func() error {
if err := i.indexRawBlockData(ctx, chainID, filterOpts.Start, *filterOpts.End); err != nil {
return errors.Wrap(err, "i.indexRawBlockData")
}

return nil
})

err := wg.Wait()

if err != nil {
if errors.Is(err, context.Canceled) {
slog.Error("filter context cancelled")
return err
}

return err
}

return nil
}

func (i *Indexer) filter(
ctx context.Context,
filter FilterFunc,
) error {
endBlockID, err := i.ethClient.BlockNumber(ctx)
if err != nil {
Expand All @@ -138,14 +241,35 @@ func (i *Indexer) filter(
"batchSize", i.blockBatchSize,
)

if i.latestIndexedBlockNumber >= i.ontakeForkHeight {
i.isPostOntakeForkHeightReached = true
}

for j := i.latestIndexedBlockNumber + 1; j <= endBlockID; j += i.blockBatchSize {
end := i.latestIndexedBlockNumber + i.blockBatchSize
end := j + i.blockBatchSize - 1

// if the end of the batch is greater than the latest block number, set end
// to the latest block number
if end > endBlockID {
end = endBlockID
}

if !i.isPostOntakeForkHeightReached && i.taikol1 != nil && i.ontakeForkHeight > i.latestIndexedBlockNumber && i.ontakeForkHeight < end {
slog.Info("ontake fork height reached", "height", i.ontakeForkHeight)

i.isPostOntakeForkHeightReached = true

end = i.ontakeForkHeight - 1

slog.Info("setting end block ID to ontakeForkHeight - 1",
"latestIndexedBlockNumber",
i.latestIndexedBlockNumber,
"ontakeForkHeight", i.ontakeForkHeight,
"endBlockID", end,
"isPostOntakeForkHeightReached", i.isPostOntakeForkHeightReached,
)
}

slog.Info("block batch", "start", j, "end", end)

filterOpts := &bind.FilterOpts{
Expand All @@ -154,6 +278,14 @@ func (i *Indexer) filter(
Context: ctx,
}

var filter FilterFunc

if i.isPostOntakeForkHeightReached {
filter = filterFuncOntake
} else {
filter = filterFunc
}

if err := filter(ctx, new(big.Int).SetUint64(i.srcChainID), i, filterOpts); err != nil {
return errors.Wrap(err, "filter")
}
Expand Down
6 changes: 5 additions & 1 deletion packages/eventindexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Indexer struct {

contractToMetadata map[common.Address]*eventindexer.ERC20Metadata
contractToMetadataMutex *sync.Mutex

ontakeForkHeight uint64
isPostOntakeForkHeightReached bool
}

func (i *Indexer) Start() error {
Expand Down Expand Up @@ -97,7 +100,7 @@ func (i *Indexer) eventLoop(ctx context.Context) {
slog.Info("event loop context done")
return
case <-t.C:
if err := i.filter(ctx, filterFunc); err != nil {
if err := i.filter(ctx); err != nil {
slog.Error("error filtering", "error", err)
}
}
Expand Down Expand Up @@ -204,6 +207,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
i.layer = cfg.Layer
i.contractToMetadata = make(map[common.Address]*eventindexer.ERC20Metadata, 0)
i.contractToMetadataMutex = &sync.Mutex{}
i.ontakeForkHeight = cfg.OntakeForkHeight

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions packages/eventindexer/indexer/save_block_proposed_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,89 @@ func (i *Indexer) saveBlockProposedEvent(

return nil
}

func (i *Indexer) saveBlockProposedEventsV2(
ctx context.Context,
chainID *big.Int,
events *taikol1.TaikoL1BlockProposedV2Iterator,
) error {
if !events.Next() || events.Event == nil {
slog.Info("no blockProposedV2 events")
return nil
}

wg, ctx := errgroup.WithContext(ctx)

for {
event := events.Event

wg.Go(func() error {
tx, _, err := i.ethClient.TransactionByHash(ctx, event.Raw.TxHash)
if err != nil {
return errors.Wrap(err, "i.ethClient.TransactionByHash")
}

sender, err := i.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex)
if err != nil {
return errors.Wrap(err, "i.ethClient.TransactionSender")
}

if err := i.saveBlockProposedEventV2(ctx, chainID, event, sender); err != nil {
eventindexer.BlockProposedEventsProcessedError.Inc()

return errors.Wrap(err, "i.saveBlockProposedEvent")
}

return nil
})

if !events.Next() {
break
}
}

if err := wg.Wait(); err != nil {
return err
}

return nil
}

func (i *Indexer) saveBlockProposedEventV2(
ctx context.Context,
chainID *big.Int,
event *taikol1.TaikoL1BlockProposedV2,
sender common.Address,
) error {
slog.Info("blockProposed", "proposer", sender.Hex())

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.BlockId.Int64()

block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber))
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProposed,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockProposed,
Address: sender.Hex(),
BlockID: &blockID,
TransactedAt: time.Unix(int64(block.Time()), 0).UTC(),
EmittedBlockID: event.Raw.BlockNumber,
})
if err != nil {
return errors.Wrap(err, "i.eventRepo.Save")
}

eventindexer.BlockProposedEventsProcessed.Inc()

return nil
}
75 changes: 75 additions & 0 deletions packages/eventindexer/indexer/save_block_verified_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,78 @@ func (i *Indexer) saveBlockVerifiedEvent(

return nil
}

func (i *Indexer) saveBlockVerifiedEventsV2(
ctx context.Context,
chainID *big.Int,
events *taikol1.TaikoL1BlockVerifiedV2Iterator,
) error {
if !events.Next() || events.Event == nil {
slog.Info("no BlockVerified events")
return nil
}

wg, ctx := errgroup.WithContext(ctx)

for {
event := events.Event

wg.Go(func() error {
if err := i.saveBlockVerifiedEventV2(ctx, chainID, event); err != nil {
eventindexer.BlockVerifiedEventsProcessedError.Inc()

return errors.Wrap(err, "i.saveBlockVerifiedEvent")
}

return nil
})

if !events.Next() {
break
}
}

if err := wg.Wait(); err != nil {
return err
}

return nil
}

func (i *Indexer) saveBlockVerifiedEventV2(
ctx context.Context,
chainID *big.Int,
event *taikol1.TaikoL1BlockVerifiedV2,
) error {
slog.Info("new blockVerified event", "blockID", event.BlockId.Int64())

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.BlockId.Int64()

block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber))
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockVerified,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockVerified,
Address: "",
BlockID: &blockID,
TransactedAt: time.Unix(int64(block.Time()), 0),
EmittedBlockID: event.Raw.BlockNumber,
})
if err != nil {
return errors.Wrap(err, "i.eventRepo.Save")
}

eventindexer.BlockVerifiedEventsProcessed.Inc()

return nil
}
Loading

0 comments on commit 83b6f15

Please sign in to comment.