Skip to content

Commit

Permalink
Merge pull request #332 from onflow/gregor/batch
Browse files Browse the repository at this point in the history
Batch index updates
  • Loading branch information
sideninja authored Jul 9, 2024
2 parents 11f6124 + c09bf25 commit e420b0d
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 251 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fix-lint:

.PHONY: generate
generate:
go get -d github.com/vektra/mockery/[email protected]
go install github.com/vektra/mockery/[email protected]
mockery --dir=storage --name=BlockIndexer --output=storage/mocks
mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks
mockery --dir=storage --name=TransactionIndexer --output=storage/mocks
Expand Down
8 changes: 3 additions & 5 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ func (b *BlockChainAPI) GetTransactionByHash(
return handleError[*Transaction](b.logger, err)
}

txHash := tx.Hash()

rcp, err := b.receipts.GetByTransactionID(txHash)
rcp, err := b.receipts.GetByTransactionID(hash)
if err != nil {
return handleError[*Transaction](b.logger, err)
}
Expand Down Expand Up @@ -609,7 +607,7 @@ func (b *BlockChainAPI) GetTransactionCount(
return handleError[*hexutil.Uint64](b.logger, err)
}

nonce, err := b.accounts.GetNonce(&address)
nonce, err := b.accounts.GetNonce(address)
if err != nil {
b.logger.Error().Err(err).Msg("get nonce failed")
return handleError[*hexutil.Uint64](b.logger, errs.ErrInternal)
Expand Down Expand Up @@ -725,7 +723,7 @@ func (b *BlockChainAPI) fetchBlockTransactions(
return nil, err
}
if transaction == nil {
b.logger.Warn().
b.logger.Error().
Str("tx-hash", txHash.String()).
Uint64("evm-height", block.Height).
Msg("not found a transaction the block references")
Expand Down
17 changes: 10 additions & 7 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ func Start(ctx context.Context, cfg *config.Config) error {
logger = logger.Level(cfg.LogLevel)
logger.Info().Msg("starting up the EVM gateway")

pebbleDB, err := pebble.New(cfg.DatabaseDir, logger)
store, err := pebble.New(cfg.DatabaseDir, logger)
if err != nil {
return err
}

blocks := pebble.NewBlocks(pebbleDB)
transactions := pebble.NewTransactions(pebbleDB)
receipts := pebble.NewReceipts(pebbleDB)
accounts := pebble.NewAccounts(pebbleDB)
trace := pebble.NewTraces(pebbleDB)
blocks := pebble.NewBlocks(store)
transactions := pebble.NewTransactions(store)
receipts := pebble.NewReceipts(store)
accounts := pebble.NewAccounts(store)
trace := pebble.NewTraces(store)

blocksBroadcaster := broadcast.NewBroadcaster()
transactionsBroadcaster := broadcast.NewBroadcaster()
Expand All @@ -48,7 +48,7 @@ func Start(ctx context.Context, cfg *config.Config) error {
// this should only be used locally or for testing
if cfg.ForceStartCadenceHeight != 0 {
logger.Warn().Uint64("height", cfg.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!")
if err := blocks.SetLatestCadenceHeight(cfg.ForceStartCadenceHeight); err != nil {
if err := blocks.SetLatestCadenceHeight(cfg.ForceStartCadenceHeight, nil); err != nil {
return err
}
}
Expand Down Expand Up @@ -122,6 +122,7 @@ func Start(ctx context.Context, cfg *config.Config) error {
ctx,
cfg,
client,
store,
blocks,
transactions,
receipts,
Expand All @@ -143,6 +144,7 @@ func startIngestion(
ctx context.Context,
cfg *config.Config,
client *requester.CrossSporkClient,
store *pebble.Storage,
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
Expand Down Expand Up @@ -216,6 +218,7 @@ func startIngestion(

eventEngine := ingestion.NewEventIngestionEngine(
subscriber,
store,
blocks,
receipts,
transactions,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
cloud.google.com/go/storage v1.36.0
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
github.com/goccy/go-json v0.10.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onflow/cadence v1.0.0-preview.35
github.com/onflow/flow-go v0.35.14-crescendo-preview.27.0.20240626210601-604590f19db9
github.com/onflow/flow-go-sdk v1.0.0-preview.37
Expand Down Expand Up @@ -83,6 +82,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
Expand Down
70 changes: 48 additions & 22 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"context"
"fmt"

pebbleDB "github.com/cockroachdb/pebble"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/fvm/evm/types"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
)

var _ models.Engine = &Engine{}

type Engine struct {
subscriber EventSubscriber
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
Expand All @@ -31,6 +34,7 @@ type Engine struct {

func NewEventIngestionEngine(
subscriber EventSubscriber,
store *pebble.Storage,
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
transactions storage.TransactionIndexer,
Expand All @@ -44,6 +48,7 @@ func NewEventIngestionEngine(

return &Engine{
subscriber: subscriber,
store: store,
blocks: blocks,
receipts: receipts,
transactions: transactions,
Expand Down Expand Up @@ -132,19 +137,27 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {

// if heartbeat interval with no data still update the cadence height
if events.Empty() {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight()); err != nil {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil {
return fmt.Errorf("failed to update to latest cadence height during events ingestion: %w", err)
}
return nil // nothing else to do this was heartbeat event with not event payloads
}

batch := e.store.NewBatch()
defer batch.Close()

// we first index evm blocks only then transactions if any present
blocks, err := events.Blocks()
if err != nil {
return err
}
for _, block := range blocks {
if err := e.indexBlock(events.CadenceHeight(), events.CadenceBlockID(), block); err != nil {
if err := e.indexBlock(
events.CadenceHeight(),
events.CadenceBlockID(),
block,
batch,
); err != nil {
return err
}
}
Expand All @@ -154,15 +167,36 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return err
}
for i, tx := range txs {
if err := e.indexTransaction(tx, receipts[i]); err != nil {
if err := e.indexTransaction(tx, receipts[i], batch); err != nil {
return err
}
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data: %w", err)
}

// emit events for each block, transaction and logs, only after we successfully commit the data
for range blocks {
e.blocksBroadcaster.Publish()
}

for _, r := range receipts {
e.transactionsBroadcaster.Publish()
if len(r.Logs) > 0 {
e.logsBroadcaster.Publish()
}
}

return nil
}

func (e *Engine) indexBlock(cadenceHeight uint64, cadenceID flow.Identifier, block *types.Block) error {
func (e *Engine) indexBlock(
cadenceHeight uint64,
cadenceID flow.Identifier,
block *types.Block,
batch *pebbleDB.Batch,
) error {
if block == nil { // safety check shouldn't happen
return fmt.Errorf("can't process empty block")
}
Expand Down Expand Up @@ -190,49 +224,41 @@ func (e *Engine) indexBlock(cadenceHeight uint64, cadenceID flow.Identifier, blo
Strs("tx-hashes", txHashes).
Msg("new evm block executed event")

// todo should probably be batch in the same as bellow tx
if err := e.blocks.Store(cadenceHeight, cadenceID, block); err != nil {
if err := e.blocks.Store(cadenceHeight, cadenceID, block, batch); err != nil {
return err
}

e.blocksBroadcaster.Publish()
return nil
}

func (e *Engine) indexTransaction(tx models.Transaction, receipt *models.StorageReceipt) error {
func (e *Engine) indexTransaction(
tx models.Transaction,
receipt *models.StorageReceipt,
batch *pebbleDB.Batch,
) error {
if tx == nil || receipt == nil { // safety check shouldn't happen
return fmt.Errorf("can't process empty tx or receipt")
}

txHash := tx.Hash()

e.log.Info().
Str("contract-address", receipt.ContractAddress.String()).
Int("log-count", len(receipt.Logs)).
Uint64("evm-height", receipt.BlockNumber.Uint64()).
Uint("tx-index", receipt.TransactionIndex).
Str("tx-hash", txHash.String()).
Str("tx-hash", tx.Hash().String()).
Msg("ingesting new transaction executed event")

// todo think if we could introduce batching
if err := e.transactions.Store(tx); err != nil {
if err := e.transactions.Store(tx, batch); err != nil {
return fmt.Errorf("failed to store tx: %w", err)
}

if err := e.accounts.Update(tx, receipt); err != nil {
if err := e.accounts.Update(tx, receipt, batch); err != nil {
return fmt.Errorf("failed to update accounts: %w", err)
}

if err := e.receipts.Store(receipt); err != nil {
if err := e.receipts.Store(receipt, batch); err != nil {
return fmt.Errorf("failed to store receipt: %w", err)
}

e.transactionsBroadcaster.Publish()

// only notify if we have new logs
if len(receipt.Logs) > 0 {
e.logsBroadcaster.Publish()
}

return nil
}
Loading

0 comments on commit e420b0d

Please sign in to comment.