Skip to content

Commit

Permalink
Merge pull request #4801 from onflow/petera/4798-add-indexer-metrics
Browse files Browse the repository at this point in the history
[Access] Add metrics to execution state indexer
  • Loading branch information
peterargue authored Oct 6, 2023
2 parents 5e1e793 + 9afb31f commit 2f25cfa
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 63 deletions.
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

indexerCore, err := indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
builder.DB,
builder.Storage.RegisterIndex,
builder.Storage.Headers,
Expand Down
8 changes: 8 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ type ExecutionDataRequesterMetrics interface {
FetchRetried()
}

type ExecutionStateIndexerMetrics interface {
// BlockIndexed records metrics from indexing execution data from a single block.
BlockIndexed(height uint64, duration time.Duration, events, registers, transactionResults int)

// BlockReindexed records that a previously indexed block was indexed again.
BlockReindexed()
}

type RuntimeMetrics interface {
// RuntimeTransactionParsed reports the time spent parsing a single transaction
RuntimeTransactionParsed(dur time.Duration)
Expand Down
90 changes: 90 additions & 0 deletions module/metrics/execution_state_indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/onflow/flow-go/module"
)

var _ module.ExecutionStateIndexerMetrics = (*ExecutionStateIndexerCollector)(nil)

type ExecutionStateIndexerCollector struct {
indexDuration prometheus.Histogram
highestIndexedHeight prometheus.Gauge

indexedEvents prometheus.Counter
indexedRegisters prometheus.Counter
indexedTransactionResults prometheus.Counter
reindexedHeightCount prometheus.Counter
}

func NewExecutionStateIndexerCollector() module.ExecutionStateIndexerMetrics {
indexDuration := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "index_duration_ms",
Help: "the duration of execution state indexing operation",
Buckets: []float64{1, 5, 10, 50, 100},
})

highestIndexedHeight := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "highest_indexed_height",
Help: "highest block height that has been indexed",
})

indexedEvents := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_events",
Help: "number of events indexed",
})

indexedRegisters := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_registers",
Help: "number of registers indexed",
})

indexedTransactionResults := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_transaction_results",
Help: "number of transaction results indexed",
})

reindexedHeightCount := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "reindexed_height_count",
Help: "number of times a previously indexed height is reindexed",
})

return &ExecutionStateIndexerCollector{
indexDuration: indexDuration,
highestIndexedHeight: highestIndexedHeight,
indexedEvents: indexedEvents,
indexedRegisters: indexedRegisters,
indexedTransactionResults: indexedTransactionResults,
reindexedHeightCount: reindexedHeightCount,
}
}

// BlockIndexed records metrics from indexing execution data from a single block.
func (c *ExecutionStateIndexerCollector) BlockIndexed(height uint64, duration time.Duration, registers, events, transactionResults int) {
c.indexDuration.Observe(float64(duration.Milliseconds()))
c.highestIndexedHeight.Set(float64(height))
c.indexedEvents.Add(float64(events))
c.indexedRegisters.Add(float64(registers))
c.indexedTransactionResults.Add(float64(transactionResults))
}

// BlockReindexed records that a previously indexed block was indexed again.
func (c *ExecutionStateIndexerCollector) BlockReindexed() {
c.reindexedHeightCount.Inc()
}
1 change: 1 addition & 0 deletions module/metrics/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
subsystemExeDataProvider = "provider"
subsystemExeDataPruner = "pruner"
subsystemExecutionDataRequester = "execution_data_requester"
subsystemExecutionStateIndexer = "execution_state_indexer"
subsystemExeDataBlobstore = "blobstore"
)

Expand Down
114 changes: 59 additions & 55 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,61 +169,60 @@ func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string,
func (nc *NoopCollector) ExecutionBlockCachedPrograms(programs int) {}
func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, _ int, _, _ uint64, _, _ int, _ bool) {
}
func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {}
func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {}
func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {}
func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCount(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {}
func (nc *NoopCollector) LatestTrieRegSize(size uint64) {}
func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {}
func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {}
func (nc *NoopCollector) UpdateCount() {}
func (nc *NoopCollector) ProofSize(bytes uint32) {}
func (nc *NoopCollector) UpdateValuesNumber(number uint64) {}
func (nc *NoopCollector) UpdateValuesSize(byte uint64) {}
func (nc *NoopCollector) UpdateDuration(duration time.Duration) {}
func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ReadValuesNumber(number uint64) {}
func (nc *NoopCollector) ReadValuesSize(byte uint64) {}
func (nc *NoopCollector) ReadDuration(duration time.Duration) {}
func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ExecutionCollectionRequestSent() {}
func (nc *NoopCollector) ExecutionCollectionRequestRetried() {}
func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {}
func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {}
func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {}
func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {}
func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {}
func (nc *NoopCollector) ScriptExecutionResultMismatch() {}
func (nc *NoopCollector) ScriptExecutionResultMatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMismatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMatch() {}
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionSubmissionFailed() {}
func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {}
func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {}
func (nc *NoopCollector) ChunkDataPackRequestProcessed() {}
func (nc *NoopCollector) ExecutionSync(syncing bool) {}
func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {}
func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {}
func (nc *NoopCollector) ExecutionComputationResultUploaded() {}
func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {}
func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {}
func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {}
func (nc *NoopCollector) AddBlobsFailed() {}
func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {}
func (nc *NoopCollector) ReceiptSkipped() {}
func (nc *NoopCollector) RequestSucceeded(blockHeight uint64, duration time.Duration, totalSize uint64, numberOfAttempts int) {
}
func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {}
func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {}
func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {}
func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCount(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {}
func (nc *NoopCollector) LatestTrieRegSize(size uint64) {}
func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {}
func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {}
func (nc *NoopCollector) UpdateCount() {}
func (nc *NoopCollector) ProofSize(bytes uint32) {}
func (nc *NoopCollector) UpdateValuesNumber(number uint64) {}
func (nc *NoopCollector) UpdateValuesSize(byte uint64) {}
func (nc *NoopCollector) UpdateDuration(duration time.Duration) {}
func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ReadValuesNumber(number uint64) {}
func (nc *NoopCollector) ReadValuesSize(byte uint64) {}
func (nc *NoopCollector) ReadDuration(duration time.Duration) {}
func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ExecutionCollectionRequestSent() {}
func (nc *NoopCollector) ExecutionCollectionRequestRetried() {}
func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {}
func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {}
func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {}
func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {}
func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {}
func (nc *NoopCollector) ScriptExecutionResultMismatch() {}
func (nc *NoopCollector) ScriptExecutionResultMatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMismatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMatch() {}
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionSubmissionFailed() {}
func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {}
func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {}
func (nc *NoopCollector) ChunkDataPackRequestProcessed() {}
func (nc *NoopCollector) ExecutionSync(syncing bool) {}
func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {}
func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {}
func (nc *NoopCollector) ExecutionComputationResultUploaded() {}
func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {}
func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {}
func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {}
func (nc *NoopCollector) AddBlobsFailed() {}
func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {}
func (nc *NoopCollector) ReceiptSkipped() {}
func (nc *NoopCollector) RequestSucceeded(uint64, time.Duration, uint64, int) {}
func (nc *NoopCollector) RequestFailed(duration time.Duration, retryable bool) {}
func (nc *NoopCollector) RequestCanceled() {}
func (nc *NoopCollector) ResponseDropped() {}
Expand Down Expand Up @@ -312,3 +311,8 @@ func (nc *NoopCollector) OnViolationReportSkipped() {}
var _ ObserverMetrics = (*NoopCollector)(nil)

func (nc *NoopCollector) RecordRPC(handler, rpc string, code codes.Code) {}

var _ module.ExecutionStateIndexerMetrics = (*NoopCollector)(nil)

func (nc *NoopCollector) BlockIndexed(uint64, time.Duration, int, int, int) {}
func (nc *NoopCollector) BlockReindexed() {}
21 changes: 17 additions & 4 deletions module/state_synchronization/indexer/indexer_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
Expand All @@ -18,11 +19,13 @@ import (

// IndexerCore indexes the execution state.
type IndexerCore struct {
log zerolog.Logger
metrics module.ExecutionStateIndexerMetrics

registers storage.RegisterIndex
headers storage.Headers
events storage.Events
results storage.LightTransactionResults
log zerolog.Logger
batcher bstorage.BatchBuilder
}

Expand All @@ -31,6 +34,7 @@ type IndexerCore struct {
// won't be initialized to ensure we have bootstrapped the storage first.
func New(
log zerolog.Logger,
metrics module.ExecutionStateIndexerMetrics,
batcher bstorage.BatchBuilder,
registers storage.RegisterIndex,
headers storage.Headers,
Expand All @@ -39,6 +43,7 @@ func New(
) (*IndexerCore, error) {
return &IndexerCore{
log: log.With().Str("component", "execution_indexer").Logger(),
metrics: metrics,
batcher: batcher,
registers: registers,
headers: headers,
Expand Down Expand Up @@ -92,6 +97,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
// for indexing resources which might fail to update the values, so this enables rerunning and reindexing those resources
if block.Height == latest {
lg.Warn().Msg("reindexing block data")
c.metrics.BlockReindexed()
}

start := time.Now()
Expand All @@ -103,6 +109,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
// downloaded and indexed before the block is sealed. However, when a node is catching up, it
// may download the execution data first. In that case, we should index the collections here.

var eventCount, resultCount, registerCount int
g.Go(func() error {
start := time.Now()

Expand Down Expand Up @@ -130,9 +137,12 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("batch flush error: %w", err)
}

eventCount = len(events)
resultCount = len(results)

lg.Debug().
Int("event_count", len(events)).
Int("result_count", len(results)).
Int("event_count", eventCount).
Int("result_count", resultCount).
Dur("duration_ms", time.Since(start)).
Msg("indexed badger data")

Expand Down Expand Up @@ -168,8 +178,10 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("could not index register payloads at height %d: %w", block.Height, err)
}

registerCount = len(payloads)

lg.Debug().
Int("register_count", len(payloads)).
Int("register_count", registerCount).
Dur("duration_ms", time.Since(start)).
Msg("indexed registers")

Expand All @@ -181,6 +193,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err)
}

c.metrics.BlockIndexed(block.Height, time.Since(start), registerCount, eventCount, resultCount)
lg.Debug().
Dur("duration_ms", time.Since(start)).
Msg("indexed block data")
Expand Down
Loading

0 comments on commit 2f25cfa

Please sign in to comment.