Skip to content

Commit

Permalink
add program and register caching on observers
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed May 1, 2024
1 parent aa22a87 commit 4a7ac2f
Showing 1 changed file with 68 additions and 10 deletions.
78 changes: 68 additions & 10 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type ObserverServiceConfig struct {
executionDataConfig edrequester.ExecutionDataConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -229,6 +232,9 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
},
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
}

Expand Down Expand Up @@ -737,6 +743,18 @@ func (builder *ObserverServiceBuilder) extraFlags() {
defaultConfig.scriptExecMaxBlock,
"highest block height to allow for script execution. default: no limit")

flags.StringVar(&builder.registerCacheType,
"register-cache-type",
defaultConfig.registerCacheType,
"type of backend cache to use for registers (lru, arc, 2q)")
flags.UintVar(&builder.registerCacheSize,
"register-cache-size",
defaultConfig.registerCacheSize,
"number of registers to cache for script execution. default: 0 (no cache)")
flags.UintVar(&builder.programCacheSize,
"program-cache-size",
defaultConfig.programCacheSize,
"[experimental] number of blocks to cache for cadence programs. use 0 to disable cache. default: 0. Note: this is an experimental feature and may cause nodes to become unstable under certain workloads. Use with caution.")
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down Expand Up @@ -1288,7 +1306,25 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

builder.Storage.RegisterIndex = registers
if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
builder.Storage.RegisterIndex = registersCache
} else {
builder.Storage.RegisterIndex = registers
}

indexerDerivedChainData, queryDerivedChainData, err := builder.buildDerivedChainData()
if err != nil {
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
indexerCore, err := indexer.New(
Expand All @@ -1302,7 +1338,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.RootChainID.Chain(),
nil,
indexerDerivedChainData,
collectionExecutedMetric,
)
if err != nil {
Expand Down Expand Up @@ -1332,12 +1368,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, err
}

// cache is disabled on observers for now
derivedChainData, err := derived.NewDerivedChainData(1)
if err != nil {
return nil, err
}

// create script execution module, this depends on the indexer being initialized and the
// having the register storage bootstrapped
scripts := execution.NewScripts(
Expand All @@ -1348,8 +1378,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.Storage.Headers,
builder.ExecutionIndexerCore.RegisterValue,
builder.scriptExecutorConfig,
derivedChainData,
false,
queryDerivedChainData,
builder.programCacheSize > 0,
)

err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
Expand Down Expand Up @@ -1462,6 +1492,34 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder
}

// buildDerivedChainData creates the derived chain data for the indexer and the query engine
// If program caching is disabled, the function will return nil for the indexer cache, and a
// derived chain data object for the query engine cache.
func (builder *ObserverServiceBuilder) buildDerivedChainData() (
indexerCache *derived.DerivedChainData,
queryCache *derived.DerivedChainData,
err error,
) {
cacheSize := builder.programCacheSize

// the underlying cache requires size > 0. no data will be written so 1 is fine.
if cacheSize == 0 {
cacheSize = 1
}

derivedChainData, err := derived.NewDerivedChainData(cacheSize)
if err != nil {
return nil, nil, err
}

// writes are done by the indexer. using a nil value effectively disables writes to the cache.
if builder.programCacheSize == 0 {
return nil, derivedChainData, nil
}

return derivedChainData, derivedChainData, nil
}

// enqueuePublicNetworkInit enqueues the observer network component initialized for the observer
func (builder *ObserverServiceBuilder) enqueuePublicNetworkInit() {
var publicLibp2pNode p2p.LibP2PNode
Expand Down

0 comments on commit 4a7ac2f

Please sign in to comment.