Skip to content

Commit

Permalink
fast-path for getReceipts (#12050)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevemilk authored Sep 29, 2024
1 parent 443fabe commit ad05810
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 32 deletions.
59 changes: 55 additions & 4 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,67 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader

type ReceiptsGetter interface {
GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block) (types.Receipts, error)
GetCachedReceipts(ctx context.Context, blockHash libcommon.Hash) (types.Receipts, bool)
}

func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
type cachedReceipts struct {
EncodedReceipts []rlp.RawValue
Bytes int // total size of the encoded receipts
PendingIndex int // index of the first not-found receipt in the query
}

func AnswerGetReceiptsQueryCacheOnly(ctx context.Context, receiptsGetter ReceiptsGetter, query GetReceiptsPacket) (*cachedReceipts, bool, error) {
var (
bytes int
receipts []rlp.RawValue
bytes int
receiptsList []rlp.RawValue
pendingIndex int
needMore = true
)

for lookups, hash := range query {
if bytes >= softResponseLimit || len(receiptsList) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
needMore = false
break
}
if receipts, ok := receiptsGetter.GetCachedReceipts(ctx, hash); ok {
if encoded, err := rlp.EncodeToBytes(receipts); err != nil {
return nil, needMore, fmt.Errorf("failed to encode receipt: %w", err)
} else {
receiptsList = append(receiptsList, encoded)
bytes += len(encoded)
pendingIndex = lookups + 1
}
} else {
break
}
}
if pendingIndex == len(query) {
needMore = false
}
return &cachedReceipts{
EncodedReceipts: receiptsList,
Bytes: bytes,
PendingIndex: pendingIndex,
}, needMore, nil
}

func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, cachedReceipts *cachedReceipts) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
pendingIndex int
)

if cachedReceipts != nil {
bytes = cachedReceipts.Bytes
receipts = cachedReceipts.EncodedReceipts
pendingIndex = cachedReceipts.PendingIndex
}

for lookups := pendingIndex; lookups < len(query); lookups++ {
hash := query[lookups]
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
Expand Down
33 changes: 21 additions & 12 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,26 +576,35 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In
if !EnableP2PReceipts {
return nil
}

err := cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.getReceiptsActiveGoroutineNumber.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
cachedReceipts, needMore, err := eth.AnswerGetReceiptsQueryCacheOnly(ctx, cs.ethApiWrapper, query.GetReceiptsPacket)
if err != nil {
return err
}
defer tx.Rollback()
receiptsList := []rlp.RawValue{}
if cachedReceipts != nil {
receiptsList = cachedReceipts.EncodedReceipts
}
if needMore {
err = cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.getReceiptsActiveGoroutineNumber.Release(1)

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
receiptsList, err = eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket, cachedReceipts)
if err != nil {
return err
}

receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket)
if err != nil {
return err
}
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
Expand Down
34 changes: 18 additions & 16 deletions turbo/jsonrpc/erigon_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,26 @@ import (

// GetLogsByHash implements erigon_getLogsByHash. Returns an array of arrays of logs generated by the transactions in the block given by the block's hash.
func (api *ErigonImpl) GetLogsByHash(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
tx, err := api.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
receipts, ok := api.getCachedReceipts(ctx, hash)
if !ok {
tx, err := api.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()

block, err := api.blockByHashWithSenders(ctx, tx, hash)
if err != nil {
return nil, err
}
if block == nil {
return nil, nil
}
receipts, err := api.getReceipts(ctx, tx, block)
if err != nil {
return nil, fmt.Errorf("getReceipts error: %w", err)
block, err := api.blockByHashWithSenders(ctx, tx, hash)
if err != nil {
return nil, err
}
if block == nil {
return nil, nil
}
receipts, err = api.getReceipts(ctx, tx, block)
if err != nil {
return nil, fmt.Errorf("getReceipts error: %w", err)
}
}

logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
Expand Down
4 changes: 4 additions & 0 deletions turbo/jsonrpc/eth_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, block *types.Bloc
return api.receiptsGenerator.GetReceipts(ctx, chainConfig, tx, block)
}

func (api *BaseAPI) getCachedReceipts(ctx context.Context, hash common.Hash) (types.Receipts, bool) {
return api.receiptsGenerator.GetCachedReceipts(ctx, hash)
}

// GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object.
func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) (types.Logs, error) {
var begin, end uint64
Expand Down
4 changes: 4 additions & 0 deletions turbo/jsonrpc/receipts/receipts_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func NewGenerator(cacheSize int, blockReader services.FullBlockReader,
}
}

func (g *Generator) GetCachedReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, bool) {
return g.receiptsCache.Get(blockHash)
}

func (g *Generator) GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block) (types.Receipts, error) {
if receipts, ok := g.receiptsCache.Get(block.Hash()); ok {
return receipts, nil
Expand Down

0 comments on commit ad05810

Please sign in to comment.