Skip to content

Commit

Permalink
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into access…
Browse files Browse the repository at this point in the history
…-cohort3-integration-tests-fix
  • Loading branch information
AndriiDiachuk committed May 20, 2024
2 parents 2daf2b2 + c925f53 commit 280a223
Show file tree
Hide file tree
Showing 140 changed files with 5,907 additions and 5,293 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ generate-mocks: install-mock-generators
CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network EngineRegistry
mockery --name=ExecutionDataStore --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name=Downloader --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name='.*' --dir=integration/benchmark/mocksiface --case=underscore --output="integration/benchmark/mock" --outpkg="mock"
mockery --name '(ExecutionDataRequester|IndexReporter)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization"
mockery --name 'ExecutionState' --dir=engine/execution/state --case=underscore --output="engine/execution/state/mock" --outpkg="mock"
mockery --name 'BlockComputer' --dir=engine/execution/computation/computer --case=underscore --output="engine/execution/computation/computer/mock" --outpkg="mock"
Expand Down
5 changes: 1 addition & 4 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,10 +1118,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
value := messageIndex.Value()
if ok := messageIndex.Set(value + 1); !ok {
return status.Errorf(codes.Internal, "the message index has already been incremented to %d", messageIndex.Value())
}
value := messageIndex.Increment()

err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: TransactionResultToMessage(txResults[i]),
Expand Down
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// requester expects the initial last processed height, which is the first height - 1
builder.executionDataConfig.InitialBlockHeight = builder.executionDataStartHeight - 1
} else {
builder.executionDataConfig.InitialBlockHeight = builder.FinalizedRootBlock.Header.Height
builder.executionDataConfig.InitialBlockHeight = builder.SealedRootBlock.Header.Height
}

execDataDistributor = edrequester.NewExecutionDataDistributor()
Expand Down
2 changes: 1 addition & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func main() {
flags.DurationVar(&maxInterval, "max-interval", 90*time.Second, "the maximum amount of time between two blocks")
flags.UintVar(&maxSealPerBlock, "max-seal-per-block", 100, "the maximum number of seals to be included in a block")
flags.UintVar(&maxGuaranteePerBlock, "max-guarantee-per-block", 100, "the maximum number of collection guarantees to be included in a block")
flags.DurationVar(&hotstuffMinTimeout, "hotstuff-min-timeout", 2500*time.Millisecond, "the lower timeout bound for the hotstuff pacemaker, this is also used as initial timeout")
flags.DurationVar(&hotstuffMinTimeout, "hotstuff-min-timeout", 1045*time.Millisecond, "the lower timeout bound for the hotstuff pacemaker, this is also used as initial timeout")
flags.Float64Var(&hotstuffTimeoutAdjustmentFactor, "hotstuff-timeout-adjustment-factor", timeout.DefaultConfig.TimeoutAdjustmentFactor, "adjustment of timeout duration in case of time out event")
flags.Uint64Var(&hotstuffHappyPathMaxRoundFailures, "hotstuff-happy-path-max-round-failures", timeout.DefaultConfig.HappyPathMaxRoundFailures, "number of failed rounds before first timeout increase")
flags.DurationVar(&cruiseCtlFallbackProposalDurationFlag, "cruise-ctl-fallback-proposal-duration", cruiseCtlConfig.FallbackProposalDelay.Load(), "the proposal duration value to use when the controller is disabled, or in epoch fallback mode. In those modes, this value has the same as the old `--block-rate-delay`")
Expand Down
45 changes: 34 additions & 11 deletions cmd/dynamic_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,45 +47,59 @@ func ValidateDynamicStartupFlags(accessPublicKey, accessAddress string, startPha
// DynamicStartPreInit is the pre-init func that will check if a node has already bootstrapped
// from a root protocol snapshot. If not attempt to get a protocol snapshot where the following
// conditions are met.
// 1. Target epoch < current epoch (in the past), set root snapshot to current snapshot
// 2. Target epoch == "current", wait until target phase == current phase before setting root snapshot
// 3. Target epoch > current epoch (in future), wait until target epoch and target phase is reached before
// 1. Target epoch < current epoch (in the past), set root snapshot to current snapshot
// 2. Target epoch == "current", wait until target phase == current phase before setting root snapshot
// 3. Target epoch > current epoch (in future), wait until target epoch and target phase is reached before
//
// setting root snapshot
func DynamicStartPreInit(nodeConfig *NodeConfig) error {
ctx := context.Background()

log := nodeConfig.Logger.With().Str("component", "dynamic-startup").Logger()

// skip dynamic startup if the protocol state is bootstrapped
// CASE 1: The state is already bootstrapped - nothing to do
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.DB)
if err != nil {
return fmt.Errorf("could not check if state is boostrapped: %w", err)
}
if isBootstrapped {
log.Info().Msg("protocol state already bootstrapped, skipping dynamic startup")
log.Debug().Msg("protocol state already bootstrapped, skipping dynamic startup")
return nil
}

// skip dynamic startup if a root snapshot file is specified - this takes priority
// CASE 2: The state is not already bootstrapped.
// We will either bootstrap from a file or using Dynamic Startup.
rootSnapshotPath := filepath.Join(nodeConfig.BootstrapDir, bootstrap.PathRootProtocolStateSnapshot)
if utilsio.FileExists(rootSnapshotPath) {
log.Info().
rootSnapshotFileExists := utilsio.FileExists(rootSnapshotPath)
dynamicStartupFlagsSet := anyDynamicStartupFlagsAreSet(nodeConfig)

// If the user has provided both a root snapshot file AND dynamic startup specification, return an error.
// Previously, the snapshot file would take precedence over the Dynamic Startup flags.
// This caused operators to inadvertently bootstrap from an old snapshot file when attempting to use Dynamic Startup.
// Therefore, we instead require the operator to explicitly choose one option or the other.
if rootSnapshotFileExists && dynamicStartupFlagsSet {
return fmt.Errorf("must specify either a root snapshot file (%s) or Dynamic Startup flags (--dynamic-startup-*) but not both", rootSnapshotPath)
}

// CASE 2.1: Use the root snapshot file to bootstrap.
if rootSnapshotFileExists {
log.Debug().
Str("root_snapshot_path", rootSnapshotPath).
Msg("protocol state is not bootstrapped, will bootstrap using configured root snapshot file, skipping dynamic startup")
return nil
}

// CASE 2.2: Use Dynamic Startup to bootstrap.

// get flow client with secure client connection to download protocol snapshot from access node
config, err := common.NewFlowClientConfig(nodeConfig.DynamicStartupANAddress, nodeConfig.DynamicStartupANPubkey, flow.ZeroID, false)
if err != nil {
return fmt.Errorf("failed to create flow client config for node dynamic startup pre-init: %w", err)
}

flowClient, err := common.FlowClient(config)
if err != nil {
return fmt.Errorf("failed to create flow client for node dynamic startup pre-init: %w", err)
}

getSnapshotFunc := func(ctx context.Context) (protocol.Snapshot, error) {
return common.GetSnapshot(ctx, flowClient)
}
Expand All @@ -95,7 +109,6 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
if err != nil {
return fmt.Errorf("failed to validate flag --dynamic-start-epoch: %w", err)
}

startupPhase := flow.GetEpochPhase(nodeConfig.DynamicStartupEpochPhase)

// validate the rest of the dynamic startup flags
Expand All @@ -121,6 +134,16 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
return nil
}

// anyDynamicStartupFlagsAreSet returns true if either the AN address or AN public key for Dynamic Startup are set.
// All other Dynamic Startup flags have default values (and aren't required) hence they aren't checked here.
// Both these flags must be set for Dynamic Startup to occur.
func anyDynamicStartupFlagsAreSet(config *NodeConfig) bool {
if len(config.DynamicStartupANAddress) > 0 || len(config.DynamicStartupANPubkey) > 0 {
return true
}
return false
}

// validateDynamicStartEpochFlags parse the start epoch flag and return the uin64 value,
// if epoch = current return the current epoch counter
func validateDynamicStartEpochFlags(ctx context.Context, getSnapshot common.GetProtocolSnapshot, flagEpoch string) (uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/dynamic_startup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func getMockSnapshot(t *testing.T, epochCounter uint64, phase flow.EpochPhase) *
snapshot := new(protocolmock.Snapshot)
snapshot.On("Epochs").Return(epochQuery)
snapshot.On("Phase").Return(phase, nil)
snapshot.On("Head").Return(unittest.BlockHeaderFixture(), nil)

return snapshot
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
// requester expects the initial last processed height, which is the first height - 1
builder.executionDataConfig.InitialBlockHeight = builder.executionDataStartHeight - 1
} else {
builder.executionDataConfig.InitialBlockHeight = builder.FinalizedRootBlock.Header.Height
builder.executionDataConfig.InitialBlockHeight = builder.SealedRootBlock.Header.Height
}

execDataDistributor = edrequester.NewExecutionDataDistributor()
Expand Down
14 changes: 11 additions & 3 deletions cmd/util/cmd/common/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"

"github.com/onflow/flow-go/utils/logging"

"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
Expand Down Expand Up @@ -88,10 +90,16 @@ func GetSnapshotAtEpochAndPhase(ctx context.Context, log zerolog.Logger, startup

// check if we are in or past the target epoch and phase
if currEpochCounter > startupEpoch || (currEpochCounter == startupEpoch && currEpochPhase >= startupEpochPhase) {
head, err := snapshot.Head()
if err != nil {
return fmt.Errorf("could not get Dynamic Startup snapshot header: %w", err)
}
log.Info().
Dur("time-waiting", time.Since(start)).
Uint64("current-epoch", currEpochCounter).
Str("current-epoch-phase", currEpochPhase.String()).
Dur("time_waiting", time.Since(start)).
Uint64("current_epoch", currEpochCounter).
Str("current_epoch_phase", currEpochPhase.String()).
Hex("finalized_root_block_id", logging.ID(head.ID())).
Uint64("finalized_block_height", head.Height).
Msg("finished dynamic startup - reached desired epoch and phase")

return nil
Expand Down
44 changes: 19 additions & 25 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"path"
"strings"

runtimeCommon "github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/onflow/flow-go/cmd/util/cmd/common"
common2 "github.com/onflow/flow-go/cmd/util/common"
"github.com/onflow/flow-go/cmd/util/ledger/migrations"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -48,6 +48,7 @@ var (
flagMaxAccountSize uint64
flagFixSlabsWithBrokenReferences bool
flagFilterUnreferencedSlabs bool
flagCPUProfile string
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -159,6 +160,9 @@ func init() {

Cmd.Flags().BoolVar(&flagFilterUnreferencedSlabs, "filter-unreferenced-slabs", false,
"filter unreferenced slabs")

Cmd.Flags().StringVar(&flagCPUProfile, "cpu-profile", "",
"enable CPU profiling")
}

func run(*cobra.Command, []string) {
Expand Down Expand Up @@ -255,24 +259,13 @@ func run(*cobra.Command, []string) {
}
}

var exportedAddresses []runtimeCommon.Address
var exportPayloadsForOwners map[string]struct{}

if len(flagOutputPayloadByAddresses) > 0 {

addresses := strings.Split(flagOutputPayloadByAddresses, ",")

for _, hexAddr := range addresses {
b, err := hex.DecodeString(strings.TrimSpace(hexAddr))
if err != nil {
log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr))
}

addr, err := runtimeCommon.BytesToAddress(b)
if err != nil {
log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b)
}

exportedAddresses = append(exportedAddresses, addr)
var err error
exportPayloadsForOwners, err = common2.ParseOwners(strings.Split(flagOutputPayloadByAddresses, ","))
if err != nil {
log.Fatal().Err(err).Msgf("failed to parse addresses")
}
}

Expand Down Expand Up @@ -330,12 +323,12 @@ func run(*cobra.Command, []string) {
var outputMsg string
if len(flagOutputPayloadFileName) > 0 {
// Output is payload file
if len(exportedAddresses) == 0 {
if len(exportPayloadsForOwners) == 0 {
outputMsg = fmt.Sprintf("exporting all payloads to %s", flagOutputPayloadFileName)
} else {
outputMsg = fmt.Sprintf(
"exporting payloads by addresses %v to %s",
flagOutputPayloadByAddresses,
"exporting payloads for owners %v to %s",
common2.OwnersToString(exportPayloadsForOwners),
flagOutputPayloadFileName,
)
}
Expand All @@ -351,15 +344,16 @@ func run(*cobra.Command, []string) {
log.Info().Msgf("state extraction plan: %s, %s", inputMsg, outputMsg)

chainID := chain.ChainID()
// TODO:
evmContractChange := migrations.EVMContractChangeNone

var burnerContractChange migrations.BurnerContractChange
burnerContractChange := migrations.BurnerContractChangeNone
evmContractChange := migrations.EVMContractChangeNone
switch chainID {
case flow.Emulator:
burnerContractChange = migrations.BurnerContractChangeDeploy
evmContractChange = migrations.EVMContractChangeDeploy
case flow.Testnet, flow.Mainnet:
burnerContractChange = migrations.BurnerContractChangeUpdate
evmContractChange = migrations.EVMContractChangeUpdate
}

stagedContracts, err := migrations.StagedContractsFromCSV(flagStagedContractsFile)
Expand Down Expand Up @@ -392,7 +386,7 @@ func run(*cobra.Command, []string) {
!flagNoMigration,
flagInputPayloadFileName,
flagOutputPayloadFileName,
exportedAddresses,
exportPayloadsForOwners,
flagSortPayloads,
opts,
)
Expand All @@ -405,7 +399,7 @@ func run(*cobra.Command, []string) {
flagNWorker,
!flagNoMigration,
flagOutputPayloadFileName,
exportedAddresses,
exportPayloadsForOwners,
flagSortPayloads,
opts,
)
Expand Down
Loading

0 comments on commit 280a223

Please sign in to comment.