From 9afb31f4f57da0236cd547bbd697cfc5937dafc1 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Thu, 5 Oct 2023 12:44:24 -0700 Subject: [PATCH] [Access] Add metrics to execution state indexer --- .../node_builder/access_node_builder.go | 1 + module/metrics.go | 8 ++ module/metrics/execution_state_indexer.go | 90 ++++++++++++++ module/metrics/namespaces.go | 1 + module/metrics/noop.go | 114 +++++++++--------- .../indexer/indexer_core.go | 21 +++- .../indexer/indexer_core_test.go | 12 +- 7 files changed, 184 insertions(+), 63 deletions(-) create mode 100644 module/metrics/execution_state_indexer.go diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index c875480769e..a42160bc4d9 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -712,6 +712,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess indexerCore, err := indexer.New( builder.Logger, + metrics.NewExecutionStateIndexerCollector(), builder.DB, builder.Storage.RegisterIndex, builder.Storage.Headers, diff --git a/module/metrics.go b/module/metrics.go index bd8215ca452..12d836e243b 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -559,6 +559,14 @@ type ExecutionDataRequesterMetrics interface { FetchRetried() } +type ExecutionStateIndexerMetrics interface { + // BlockIndexed records metrics from indexing execution data from a single block. + BlockIndexed(height uint64, duration time.Duration, events, registers, transactionResults int) + + // BlockReindexed records that a previously indexed block was indexed again. + BlockReindexed() +} + type RuntimeMetrics interface { // RuntimeTransactionParsed reports the time spent parsing a single transaction RuntimeTransactionParsed(dur time.Duration) diff --git a/module/metrics/execution_state_indexer.go b/module/metrics/execution_state_indexer.go new file mode 100644 index 00000000000..ae1eee079a1 --- /dev/null +++ b/module/metrics/execution_state_indexer.go @@ -0,0 +1,90 @@ +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/onflow/flow-go/module" +) + +var _ module.ExecutionStateIndexerMetrics = (*ExecutionStateIndexerCollector)(nil) + +type ExecutionStateIndexerCollector struct { + indexDuration prometheus.Histogram + highestIndexedHeight prometheus.Gauge + + indexedEvents prometheus.Counter + indexedRegisters prometheus.Counter + indexedTransactionResults prometheus.Counter + reindexedHeightCount prometheus.Counter +} + +func NewExecutionStateIndexerCollector() module.ExecutionStateIndexerMetrics { + indexDuration := promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "index_duration_ms", + Help: "the duration of execution state indexing operation", + Buckets: []float64{1, 5, 10, 50, 100}, + }) + + highestIndexedHeight := promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "highest_indexed_height", + Help: "highest block height that has been indexed", + }) + + indexedEvents := promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "indexed_events", + Help: "number of events indexed", + }) + + indexedRegisters := promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "indexed_registers", + Help: "number of registers indexed", + }) + + indexedTransactionResults := promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "indexed_transaction_results", + Help: "number of transaction results indexed", + }) + + reindexedHeightCount := promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespaceAccess, + Subsystem: subsystemExecutionStateIndexer, + Name: "reindexed_height_count", + Help: "number of times a previously indexed height is reindexed", + }) + + return &ExecutionStateIndexerCollector{ + indexDuration: indexDuration, + highestIndexedHeight: highestIndexedHeight, + indexedEvents: indexedEvents, + indexedRegisters: indexedRegisters, + indexedTransactionResults: indexedTransactionResults, + reindexedHeightCount: reindexedHeightCount, + } +} + +// BlockIndexed records metrics from indexing execution data from a single block. +func (c *ExecutionStateIndexerCollector) BlockIndexed(height uint64, duration time.Duration, registers, events, transactionResults int) { + c.indexDuration.Observe(float64(duration.Milliseconds())) + c.highestIndexedHeight.Set(float64(height)) + c.indexedEvents.Add(float64(events)) + c.indexedRegisters.Add(float64(registers)) + c.indexedTransactionResults.Add(float64(transactionResults)) +} + +// BlockReindexed records that a previously indexed block was indexed again. +func (c *ExecutionStateIndexerCollector) BlockReindexed() { + c.reindexedHeightCount.Inc() +} diff --git a/module/metrics/namespaces.go b/module/metrics/namespaces.go index f89f2a530ae..950c374f264 100644 --- a/module/metrics/namespaces.go +++ b/module/metrics/namespaces.go @@ -91,6 +91,7 @@ const ( subsystemExeDataProvider = "provider" subsystemExeDataPruner = "pruner" subsystemExecutionDataRequester = "execution_data_requester" + subsystemExecutionStateIndexer = "execution_state_indexer" subsystemExeDataBlobstore = "blobstore" ) diff --git a/module/metrics/noop.go b/module/metrics/noop.go index c5766802214..90bc3056de2 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -169,61 +169,60 @@ func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string, func (nc *NoopCollector) ExecutionBlockCachedPrograms(programs int) {} func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, _ int, _, _ uint64, _, _ int, _ bool) { } -func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {} -func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {} -func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {} -func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {} -func (nc *NoopCollector) LatestTrieRegCount(number uint64) {} -func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {} -func (nc *NoopCollector) LatestTrieRegSize(size uint64) {} -func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {} -func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {} -func (nc *NoopCollector) UpdateCount() {} -func (nc *NoopCollector) ProofSize(bytes uint32) {} -func (nc *NoopCollector) UpdateValuesNumber(number uint64) {} -func (nc *NoopCollector) UpdateValuesSize(byte uint64) {} -func (nc *NoopCollector) UpdateDuration(duration time.Duration) {} -func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {} -func (nc *NoopCollector) ReadValuesNumber(number uint64) {} -func (nc *NoopCollector) ReadValuesSize(byte uint64) {} -func (nc *NoopCollector) ReadDuration(duration time.Duration) {} -func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {} -func (nc *NoopCollector) ExecutionCollectionRequestSent() {} -func (nc *NoopCollector) ExecutionCollectionRequestRetried() {} -func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {} -func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {} -func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {} -func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {} -func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {} -func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {} -func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {} -func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {} -func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {} -func (nc *NoopCollector) ScriptExecutionResultMismatch() {} -func (nc *NoopCollector) ScriptExecutionResultMatch() {} -func (nc *NoopCollector) ScriptExecutionErrorMismatch() {} -func (nc *NoopCollector) ScriptExecutionErrorMatch() {} -func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {} -func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {} -func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {} -func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {} -func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {} -func (nc *NoopCollector) TransactionSubmissionFailed() {} -func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {} -func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {} -func (nc *NoopCollector) ChunkDataPackRequestProcessed() {} -func (nc *NoopCollector) ExecutionSync(syncing bool) {} -func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {} -func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {} -func (nc *NoopCollector) ExecutionComputationResultUploaded() {} -func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {} -func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {} -func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {} -func (nc *NoopCollector) AddBlobsFailed() {} -func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {} -func (nc *NoopCollector) ReceiptSkipped() {} -func (nc *NoopCollector) RequestSucceeded(blockHeight uint64, duration time.Duration, totalSize uint64, numberOfAttempts int) { -} +func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {} +func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {} +func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {} +func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {} +func (nc *NoopCollector) LatestTrieRegCount(number uint64) {} +func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {} +func (nc *NoopCollector) LatestTrieRegSize(size uint64) {} +func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {} +func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {} +func (nc *NoopCollector) UpdateCount() {} +func (nc *NoopCollector) ProofSize(bytes uint32) {} +func (nc *NoopCollector) UpdateValuesNumber(number uint64) {} +func (nc *NoopCollector) UpdateValuesSize(byte uint64) {} +func (nc *NoopCollector) UpdateDuration(duration time.Duration) {} +func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {} +func (nc *NoopCollector) ReadValuesNumber(number uint64) {} +func (nc *NoopCollector) ReadValuesSize(byte uint64) {} +func (nc *NoopCollector) ReadDuration(duration time.Duration) {} +func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {} +func (nc *NoopCollector) ExecutionCollectionRequestSent() {} +func (nc *NoopCollector) ExecutionCollectionRequestRetried() {} +func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {} +func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {} +func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {} +func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {} +func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {} +func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {} +func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {} +func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {} +func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {} +func (nc *NoopCollector) ScriptExecutionResultMismatch() {} +func (nc *NoopCollector) ScriptExecutionResultMatch() {} +func (nc *NoopCollector) ScriptExecutionErrorMismatch() {} +func (nc *NoopCollector) ScriptExecutionErrorMatch() {} +func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {} +func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {} +func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {} +func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {} +func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {} +func (nc *NoopCollector) TransactionSubmissionFailed() {} +func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {} +func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {} +func (nc *NoopCollector) ChunkDataPackRequestProcessed() {} +func (nc *NoopCollector) ExecutionSync(syncing bool) {} +func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {} +func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {} +func (nc *NoopCollector) ExecutionComputationResultUploaded() {} +func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {} +func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {} +func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {} +func (nc *NoopCollector) AddBlobsFailed() {} +func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {} +func (nc *NoopCollector) ReceiptSkipped() {} +func (nc *NoopCollector) RequestSucceeded(uint64, time.Duration, uint64, int) {} func (nc *NoopCollector) RequestFailed(duration time.Duration, retryable bool) {} func (nc *NoopCollector) RequestCanceled() {} func (nc *NoopCollector) ResponseDropped() {} @@ -312,3 +311,8 @@ func (nc *NoopCollector) OnViolationReportSkipped() {} var _ ObserverMetrics = (*NoopCollector)(nil) func (nc *NoopCollector) RecordRPC(handler, rpc string, code codes.Code) {} + +var _ module.ExecutionStateIndexerMetrics = (*NoopCollector)(nil) + +func (nc *NoopCollector) BlockIndexed(uint64, time.Duration, int, int, int) {} +func (nc *NoopCollector) BlockReindexed() {} diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go index 9027fd222ef..4e5804ac2ad 100644 --- a/module/state_synchronization/indexer/indexer_core.go +++ b/module/state_synchronization/indexer/indexer_core.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" @@ -18,11 +19,13 @@ import ( // IndexerCore indexes the execution state. type IndexerCore struct { + log zerolog.Logger + metrics module.ExecutionStateIndexerMetrics + registers storage.RegisterIndex headers storage.Headers events storage.Events results storage.LightTransactionResults - log zerolog.Logger batcher bstorage.BatchBuilder } @@ -31,6 +34,7 @@ type IndexerCore struct { // won't be initialized to ensure we have bootstrapped the storage first. func New( log zerolog.Logger, + metrics module.ExecutionStateIndexerMetrics, batcher bstorage.BatchBuilder, registers storage.RegisterIndex, headers storage.Headers, @@ -39,6 +43,7 @@ func New( ) (*IndexerCore, error) { return &IndexerCore{ log: log.With().Str("component", "execution_indexer").Logger(), + metrics: metrics, batcher: batcher, registers: registers, headers: headers, @@ -92,6 +97,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti // for indexing resources which might fail to update the values, so this enables rerunning and reindexing those resources if block.Height == latest { lg.Warn().Msg("reindexing block data") + c.metrics.BlockReindexed() } start := time.Now() @@ -103,6 +109,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti // downloaded and indexed before the block is sealed. However, when a node is catching up, it // may download the execution data first. In that case, we should index the collections here. + var eventCount, resultCount, registerCount int g.Go(func() error { start := time.Now() @@ -130,9 +137,12 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti return fmt.Errorf("batch flush error: %w", err) } + eventCount = len(events) + resultCount = len(results) + lg.Debug(). - Int("event_count", len(events)). - Int("result_count", len(results)). + Int("event_count", eventCount). + Int("result_count", resultCount). Dur("duration_ms", time.Since(start)). Msg("indexed badger data") @@ -168,8 +178,10 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti return fmt.Errorf("could not index register payloads at height %d: %w", block.Height, err) } + registerCount = len(payloads) + lg.Debug(). - Int("register_count", len(payloads)). + Int("register_count", registerCount). Dur("duration_ms", time.Since(start)). Msg("indexed registers") @@ -181,6 +193,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err) } + c.metrics.BlockIndexed(block.Height, time.Since(start), registerCount, eventCount, resultCount) lg.Debug(). Dur("duration_ms", time.Since(start)). Msg("indexed block data") diff --git a/module/state_synchronization/indexer/indexer_core_test.go b/module/state_synchronization/indexer/indexer_core_test.go index 9254ba32184..65bac290fb2 100644 --- a/module/state_synchronization/indexer/indexer_core_test.go +++ b/module/state_synchronization/indexer/indexer_core_test.go @@ -22,6 +22,7 @@ import ( "github.com/onflow/flow-go/ledger/complete" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/module/metrics" synctest "github.com/onflow/flow-go/module/state_synchronization/requester/unittest" "github.com/onflow/flow-go/storage" storagemock "github.com/onflow/flow-go/storage/mock" @@ -155,7 +156,7 @@ func (i *indexCoreTest) initIndexer() *indexCoreTest { require.NoError(i.t, os.RemoveAll(dbDir)) }) - indexer, err := New(zerolog.New(os.Stdout), db, i.registers, i.headers, i.events, i.results) + indexer, err := New(zerolog.New(os.Stdout), metrics.NewNoopCollector(), db, i.registers, i.headers, i.events, i.results) require.NoError(i.t, err) i.indexer = indexer return i @@ -601,10 +602,13 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) { require.NoError(t, os.RemoveAll(dbDir)) }) + logger := zerolog.Nop() + metrics := metrics.NewNoopCollector() + // this test makes sure index values for a single register are correctly updated and always last value is returned t.Run("Single Index Value Changes", func(t *testing.T) { pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) { - index, err := New(zerolog.Nop(), db, registers, nil, nil, nil) + index, err := New(logger, metrics, db, registers, nil, nil, nil) require.NoError(t, err) values := [][]byte{[]byte("1"), []byte("1"), []byte("2"), []byte("3") /*nil,*/, []byte("4")} @@ -631,7 +635,7 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) { // e.g. we index A{h(1) -> X}, A{h(2) -> Y}, when we request h(4) we get value Y t.Run("Single Index Value At Later Heights", func(t *testing.T) { pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) { - index, err := New(zerolog.Nop(), db, registers, nil, nil, nil) + index, err := New(logger, metrics, db, registers, nil, nil, nil) require.NoError(t, err) storeValues := [][]byte{[]byte("1"), []byte("2")} @@ -662,7 +666,7 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) { // this test makes sure we correctly handle weird payloads t.Run("Empty and Nil Payloads", func(t *testing.T) { pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) { - index, err := New(zerolog.Nop(), db, registers, nil, nil, nil) + index, err := New(logger, metrics, db, registers, nil, nil, nil) require.NoError(t, err) require.NoError(t, index.indexRegisters(map[ledger.Path]*ledger.Payload{}, 1))