Skip to content

Commit

Permalink
Merge pull request onflow#5826 from onflow/petera/access-script-cachi…
Browse files Browse the repository at this point in the history
…ng-backports

[Access] Backport register and program cache to master
  • Loading branch information
peterargue authored May 2, 2024
2 parents 8fac7a4 + 4a7ac2f commit 84a39b0
Show file tree
Hide file tree
Showing 23 changed files with 1,004 additions and 98 deletions.
77 changes: 72 additions & 5 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -153,6 +154,9 @@ type AccessNodeConfig struct {
scriptExecutorConfig query.QueryConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -247,6 +251,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
}

Expand Down Expand Up @@ -784,7 +791,25 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

builder.Storage.RegisterIndex = registers
if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
builder.Storage.RegisterIndex = registersCache
} else {
builder.Storage.RegisterIndex = registers
}

indexerDerivedChainData, queryDerivedChainData, err := builder.buildDerivedChainData()
if err != nil {
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

indexerCore, err := indexer.New(
builder.Logger,
Expand All @@ -796,6 +821,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.RootChainID.Chain(),
indexerDerivedChainData,
builder.collectionExecutedMetric,
)
if err != nil {
Expand All @@ -822,18 +849,17 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

// create script execution module, this depends on the indexer being initialized and the
// having the register storage bootstrapped
scripts, err := execution.NewScripts(
scripts := execution.NewScripts(
builder.Logger,
metrics.NewExecutionCollector(builder.Tracer),
builder.RootChainID,
query.NewProtocolStateWrapper(builder.State),
builder.Storage.Headers,
builder.ExecutionIndexerCore.RegisterValue,
builder.scriptExecutorConfig,
queryDerivedChainData,
builder.programCacheSize > 0,
)
if err != nil {
return nil, err
}

err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
Expand Down Expand Up @@ -951,6 +977,34 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return builder
}

// buildDerivedChainData creates the derived chain data for the indexer and the query engine
// If program caching is disabled, the function will return nil for the indexer cache, and a
// derived chain data object for the query engine cache.
func (builder *FlowAccessNodeBuilder) buildDerivedChainData() (
indexerCache *derived.DerivedChainData,
queryCache *derived.DerivedChainData,
err error,
) {
cacheSize := builder.programCacheSize

// the underlying cache requires size > 0. no data will be written so 1 is fine.
if cacheSize == 0 {
cacheSize = 1
}

derivedChainData, err := derived.NewDerivedChainData(cacheSize)
if err != nil {
return nil, nil, err
}

// writes are done by the indexer. using a nil value effectively disables writes to the cache.
if builder.programCacheSize == 0 {
return nil, derivedChainData, nil
}

return derivedChainData, derivedChainData, nil
}

func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
dist := consensuspubsub.NewFollowerDistributor()
dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
Expand Down Expand Up @@ -1203,6 +1257,19 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"script-execution-max-height",
defaultConfig.scriptExecMaxBlock,
"highest block height to allow for script execution. default: no limit")

flags.StringVar(&builder.registerCacheType,
"register-cache-type",
defaultConfig.registerCacheType,
"type of backend cache to use for registers (lru, arc, 2q)")
flags.UintVar(&builder.registerCacheSize,
"register-cache-size",
defaultConfig.registerCacheSize,
"number of registers to cache for script execution. default: 0 (no cache)")
flags.UintVar(&builder.programCacheSize,
"program-cache-size",
defaultConfig.programCacheSize,
"[experimental] number of blocks to cache for cadence programs. use 0 to disable cache. default: 0. Note: this is an experimental feature and may cause nodes to become unstable under certain workloads. Use with caution.")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
76 changes: 71 additions & 5 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/onflow/flow-go/engine/common/follower"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -154,6 +155,9 @@ type ObserverServiceConfig struct {
executionDataConfig edrequester.ExecutionDataConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -228,6 +232,9 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
},
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
}

Expand Down Expand Up @@ -736,6 +743,18 @@ func (builder *ObserverServiceBuilder) extraFlags() {
defaultConfig.scriptExecMaxBlock,
"highest block height to allow for script execution. default: no limit")

flags.StringVar(&builder.registerCacheType,
"register-cache-type",
defaultConfig.registerCacheType,
"type of backend cache to use for registers (lru, arc, 2q)")
flags.UintVar(&builder.registerCacheSize,
"register-cache-size",
defaultConfig.registerCacheSize,
"number of registers to cache for script execution. default: 0 (no cache)")
flags.UintVar(&builder.programCacheSize,
"program-cache-size",
defaultConfig.programCacheSize,
"[experimental] number of blocks to cache for cadence programs. use 0 to disable cache. default: 0. Note: this is an experimental feature and may cause nodes to become unstable under certain workloads. Use with caution.")
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down Expand Up @@ -1287,7 +1306,25 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

builder.Storage.RegisterIndex = registers
if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
builder.Storage.RegisterIndex = registersCache
} else {
builder.Storage.RegisterIndex = registers
}

indexerDerivedChainData, queryDerivedChainData, err := builder.buildDerivedChainData()
if err != nil {
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
indexerCore, err := indexer.New(
Expand All @@ -1300,6 +1337,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.RootChainID.Chain(),
indexerDerivedChainData,
collectionExecutedMetric,
)
if err != nil {
Expand Down Expand Up @@ -1331,18 +1370,17 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS

// create script execution module, this depends on the indexer being initialized and the
// having the register storage bootstrapped
scripts, err := execution.NewScripts(
scripts := execution.NewScripts(
builder.Logger,
metrics.NewExecutionCollector(builder.Tracer),
builder.RootChainID,
query.NewProtocolStateWrapper(builder.State),
builder.Storage.Headers,
builder.ExecutionIndexerCore.RegisterValue,
builder.scriptExecutorConfig,
queryDerivedChainData,
builder.programCacheSize > 0,
)
if err != nil {
return nil, err
}

err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
Expand Down Expand Up @@ -1454,6 +1492,34 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder
}

// buildDerivedChainData creates the derived chain data for the indexer and the query engine
// If program caching is disabled, the function will return nil for the indexer cache, and a
// derived chain data object for the query engine cache.
func (builder *ObserverServiceBuilder) buildDerivedChainData() (
indexerCache *derived.DerivedChainData,
queryCache *derived.DerivedChainData,
err error,
) {
cacheSize := builder.programCacheSize

// the underlying cache requires size > 0. no data will be written so 1 is fine.
if cacheSize == 0 {
cacheSize = 1
}

derivedChainData, err := derived.NewDerivedChainData(cacheSize)
if err != nil {
return nil, nil, err
}

// writes are done by the indexer. using a nil value effectively disables writes to the cache.
if builder.programCacheSize == 0 {
return nil, derivedChainData, nil
}

return derivedChainData, derivedChainData, nil
}

// enqueuePublicNetworkInit enqueues the observer network component initialized for the observer
func (builder *ObserverServiceBuilder) enqueuePublicNetworkInit() {
var publicLibp2pNode p2p.LibP2PNode
Expand Down
7 changes: 7 additions & 0 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func NewBlockComputer(
if maxConcurrency < 1 {
return nil, fmt.Errorf("invalid maxConcurrency: %d", maxConcurrency)
}

// this is a safeguard to prevent scripts from writing to the program cache on Execution nodes.
// writes are only allowed by transactions.
if vmCtx.AllowProgramCacheWritesInScripts {
return nil, fmt.Errorf("program cache writes are not allowed in scripts on Execution nodes")
}

systemChunkCtx := SystemChunkContext(vmCtx)
vmCtx = fvm.NewContextFromParent(
vmCtx,
Expand Down
12 changes: 12 additions & 0 deletions fvm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Context struct {
tracing.TracerSpan

environment.EnvironmentParams

// AllowProgramCacheWritesInScripts determines if the program cache can be written to in scripts
// By default, the program cache is only updated by transactions.
AllowProgramCacheWritesInScripts bool
}

// NewContext initializes a new execution context with the provided options.
Expand Down Expand Up @@ -375,3 +379,11 @@ func WithEVMEnabled(enabled bool) Option {
return ctx
}
}

// WithAllowProgramCacheWritesInScriptsEnabled enables caching of programs accessed by scripts
func WithAllowProgramCacheWritesInScriptsEnabled(enabled bool) Option {
return func(ctx Context) Context {
ctx.AllowProgramCacheWritesInScripts = enabled
return ctx
}
}
9 changes: 7 additions & 2 deletions fvm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/onflow/cadence"

"github.com/onflow/flow-go/fvm/environment"
errors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/fvm/meter"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/fvm/storage/logical"
Expand Down Expand Up @@ -161,7 +161,12 @@ func (vm *VirtualMachine) Run(
var err error
switch proc.Type() {
case ScriptProcedureType:
storageTxn = blockDatabase.NewSnapshotReadTransaction(stateParameters)
if ctx.AllowProgramCacheWritesInScripts {
// if configured, allow scripts to update the programs cache
storageTxn, err = blockDatabase.NewCachingSnapshotReadTransaction(stateParameters)
} else {
storageTxn = blockDatabase.NewSnapshotReadTransaction(stateParameters)
}
case TransactionProcedureType, BootstrapProcedureType:
storageTxn, err = blockDatabase.NewTransaction(
proc.ExecutionTime(),
Expand Down
12 changes: 12 additions & 0 deletions fvm/storage/block_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (database *BlockDatabase) NewTransaction(
}, nil
}

// NewSnapshotReadTransaction creates a new readonly transaction.
func (database *BlockDatabase) NewSnapshotReadTransaction(
parameters state.StateParameters,
) Transaction {
Expand All @@ -78,6 +79,17 @@ func (database *BlockDatabase) NewSnapshotReadTransaction(
}
}

// NewCachingSnapshotReadTransaction creates a new readonly transaction that allows writing to the
// derived transaction data table.
func (database *BlockDatabase) NewCachingSnapshotReadTransaction(
parameters state.StateParameters,
) (Transaction, error) {
return &transaction{
TransactionData: database.BlockData.NewCachingSnapshotReadTransactionData(parameters),
DerivedTransactionData: database.DerivedBlockData.NewCachingSnapshotReadDerivedTransactionData(),
}, nil
}

func (txn *transaction) Validate() error {
err := txn.DerivedTransactionData.Validate()
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions fvm/storage/derived/derived_block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ func (block *DerivedBlockData) NewChildDerivedBlockData() *DerivedBlockData {
}

func (block *DerivedBlockData) NewSnapshotReadDerivedTransactionData() *DerivedTransactionData {
txnPrograms := block.programs.NewSnapshotReadTableTransaction()

txnMeterParamOverrides := block.meterParamOverrides.NewSnapshotReadTableTransaction()
return &DerivedTransactionData{
programs: block.programs.NewSnapshotReadTableTransaction(),
meterParamOverrides: block.meterParamOverrides.NewSnapshotReadTableTransaction(),
}
}

func (block *DerivedBlockData) NewCachingSnapshotReadDerivedTransactionData() *DerivedTransactionData {
return &DerivedTransactionData{
programs: txnPrograms,
meterParamOverrides: txnMeterParamOverrides,
programs: block.programs.NewCachingSnapshotReadTableTransaction(),
meterParamOverrides: block.meterParamOverrides.NewCachingSnapshotReadTableTransaction(),
}
}

Expand Down
Loading

0 comments on commit 84a39b0

Please sign in to comment.