diff --git a/Tiltfile b/Tiltfile index 4c1d74b1c..b765db324 100644 --- a/Tiltfile +++ b/Tiltfile @@ -320,6 +320,9 @@ for x in range(localnet_config["path_gateways"]["count"]): "--set=metrics.serviceMonitor.enabled=" + str(localnet_config["observability"]["enabled"]), "--set=path.mountConfigMaps[0].name=path-config-" + str(actor_number), "--set=path.mountConfigMaps[0].mountPath=/app/config/", + "--set=fullnameOverride=path" + str(actor_number), + "--set=nameOverride=path" + str(actor_number), + "--set=global.serviceAccount.name=path" + str(actor_number), ] if localnet_config["path_local_repo"]["enabled"]: diff --git a/load-testing/config/load_test_manifest_reader.go b/load-testing/config/load_test_manifest_reader.go index db20b21fc..81b94a630 100644 --- a/load-testing/config/load_test_manifest_reader.go +++ b/load-testing/config/load_test_manifest_reader.go @@ -22,7 +22,7 @@ type LoadTestManifestYAML struct { // IsEphemeralChain is a flag that indicates whether the test is expected to be // run on LocalNet or long-living remote chain (i.e. TestNet/DevNet). IsEphemeralChain bool `yaml:"is_ephemeral_chain"` - TestNetNode string `yaml:"testnet_node"` + RPCNode string `yaml:"rpc_node"` ServiceId string `yaml:"service_id"` Suppliers []ProvisionedActorConfig `yaml:"suppliers"` Gateways []ProvisionedActorConfig `yaml:"gateways"` @@ -67,6 +67,10 @@ func validatedEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTestM return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty funding account address") } + if len(manifest.RPCNode) == 0 { + return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url") + } + for _, gateway := range manifest.Gateways { if len(gateway.Address) == 0 { return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty gateway address") @@ -107,8 +111,8 @@ func validatedNonEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTe return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("suppliers entry forbidden") } - if len(manifest.TestNetNode) == 0 { - return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty testnet node url") + if len(manifest.RPCNode) == 0 { + return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url") } if len(manifest.ServiceId) == 0 { diff --git a/load-testing/loadtest_manifest_example.yaml b/load-testing/loadtest_manifest_example.yaml index 840fafdab..47c33f07c 100644 --- a/load-testing/loadtest_manifest_example.yaml +++ b/load-testing/loadtest_manifest_example.yaml @@ -2,16 +2,16 @@ # It is intended to target a remote environment, such as a devnet or testnet. is_ephemeral_chain: false -# testnet_node is the URL of the node that the load test will use to query the +# rpc_node is the URL of the RPC node that the load test will use to query the # chain and submit transactions. -testnet_node: https://devnet-sophon-validator-rpc.poktroll.com +rpc_node: https://devnet-sophon-validator-rpc.poktroll.com # The service ID to request relays from. service_id: "anvil" # The address of the account that will be used to fund the application accounts -# so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # In non-ephemeral chains, the gateways are identified by their address. gateways: diff --git a/load-testing/loadtest_manifest_localnet.yaml b/load-testing/loadtest_manifest_localnet.yaml index 763771576..b2afa88f8 100644 --- a/load-testing/loadtest_manifest_localnet.yaml +++ b/load-testing/loadtest_manifest_localnet.yaml @@ -3,12 +3,16 @@ is_ephemeral_chain: true # This should be `true` for LocalNet as it is an ephemeral network +# rpc_node is the URL of the RPC node that the load test will use to query the +# chain and submit transactions. +rpc_node: http://localhost:26657 + # The service ID to use for the load test. service_id: anvil # The address of the account that will be used to fund the application, -# gateway and supplier accounts so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# gateway and supplier accounts so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # List of pre-provisioned suppliers used for load testing. # These suppliers will be progressively staked during the load test, according @@ -48,12 +52,12 @@ gateways: # Gateway 1; http://localhost:10350/r/gateway1/overview - address: pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4 - exposed_url: http://anvil.localhost/v1:3000 # The gateway url that the user sends relays to (e.g. curl) + exposed_url: http://localhost:3000/v1/ # The gateway url that the user sends relays to (e.g. curl) # Gateway 2; http://localhost:10350/r/gateway2/overview - address: pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz - exposed_url: http://anvil.localhost/v1:3001 + exposed_url: http://localhost:3001/v1/ # Gateway 3; http://localhost:10350/r/gateway3/overview - address: pokt1zhmkkd0rh788mc9prfq0m2h88t9ge0j83gnxya - exposed_url: http://anvil.localhost/v1:3002 + exposed_url: http://localhost:3002/v1/ diff --git a/load-testing/loadtest_manifest_localnet_single_supplier.yaml b/load-testing/loadtest_manifest_localnet_single_supplier.yaml index c455eaa8f..3e2112cea 100644 --- a/load-testing/loadtest_manifest_localnet_single_supplier.yaml +++ b/load-testing/loadtest_manifest_localnet_single_supplier.yaml @@ -3,12 +3,16 @@ is_ephemeral_chain: true # This should be `true` for LocalNet as it is an ephemeral network +# rpc_node is the URL of the RPC node that the load test will use to query the +# chain and submit transactions. +rpc_node: http://localhost:26657 + # The service ID to use for the load test. service_id: anvil # The address of the account that will be used to fund the application, -# gateway and supplier accounts so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# gateway and supplier accounts so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # List of pre-provisioned suppliers used for load testing. # These suppliers will be progressively staked during the load test, according @@ -40,12 +44,12 @@ gateways: # Gateway 1; http://localhost:10350/r/gateway1/overview - address: pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4 - exposed_url: http://anvil.localhost/v1:3000 # The gateway url that the user sends relays to (e.g. curl) + exposed_url: http://localhost:3000/v1/ # The gateway url that the user sends relays to (e.g. curl) # Gateway 2; http://localhost:10350/r/gateway2/overview - address: pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz - exposed_url: http://anvil.localhost/v1:3001 + exposed_url: http://localhost:3001/v1/ # Gateway 3; http://localhost:10350/r/gateway3/overview - address: pokt1zhmkkd0rh788mc9prfq0m2h88t9ge0j83gnxya - exposed_url: http://anvil.localhost/v1:3002 + exposed_url: http://localhost:3002/v1/ diff --git a/load-testing/tests/relays_stress.feature b/load-testing/tests/relays_stress.feature index 5aa3f63a0..d6981c95e 100644 --- a/load-testing/tests/relays_stress.feature +++ b/load-testing/tests/relays_stress.feature @@ -14,4 +14,12 @@ Feature: Loading gateway server with relays | gateway | 1 | 10 | 3 | | supplier | 1 | 10 | 3 | When a load of concurrent relay requests are sent from the applications - Then the correct pairs count of claim and proof messages should be committed on-chain \ No newline at end of file + Then the number of failed relay requests is "0" + # TODO_FOLLOWUP(@red-0ne): Implement the following steps + # Then "0" over servicing events are observed + # And "0" slashing events are observed + # And "0" expired claim events are observed + # And there is as many reimbursement requests as the number of settled claims + # And the number of claims submitted and claims settled is the same + # And the number of proofs submitted and proofs required is the same + # And the actors onchain balances are as expected \ No newline at end of file diff --git a/load-testing/tests/relays_stress_helpers_test.go b/load-testing/tests/relays_stress_helpers_test.go index aa55dcf49..88a5cae30 100644 --- a/load-testing/tests/relays_stress_helpers_test.go +++ b/load-testing/tests/relays_stress_helpers_test.go @@ -5,10 +5,11 @@ package tests import ( "context" "fmt" + "io" "net/http" - "net/url" "os" "path/filepath" + "slices" "strings" "sync" "testing" @@ -17,6 +18,9 @@ import ( "cosmossdk.io/depinject" "cosmossdk.io/math" "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/libs/json" + cmtcoretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" sdkclient "github.com/cosmos/cosmos-sdk/client" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" @@ -25,23 +29,26 @@ import ( "github.com/cosmos/cosmos-sdk/x/authz" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" + "github.com/cosmos/gogoproto/proto" "github.com/regen-network/gocuke" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/pokt-network/poktroll/load-testing/config" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" - "github.com/pokt-network/poktroll/pkg/client/tx" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/sync2" - testsession "github.com/pokt-network/poktroll/testutil/session" + testdelays "github.com/pokt-network/poktroll/testutil/delays" + "github.com/pokt-network/poktroll/testutil/events" "github.com/pokt-network/poktroll/testutil/testclient" - "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" + "github.com/pokt-network/poktroll/testutil/testclient/testblock" apptypes "github.com/pokt-network/poktroll/x/application/types" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" + prooftypes "github.com/pokt-network/poktroll/x/proof/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" suppliertypes "github.com/pokt-network/poktroll/x/supplier/types" + tokenomicstypes "github.com/pokt-network/poktroll/x/tokenomics/types" ) // actorLoadTestIncrementPlans is a struct that holds the parameters for incrementing @@ -75,29 +82,53 @@ type actorLoadTestIncrementPlan struct { maxActorCount int64 } -// setupTxEventListeners sets up the transaction event listeners to observe the -// transactions committed on-chain. -func (s *relaysSuite) setupTxEventListeners() { - eventsQueryClient := testeventsquery.NewLocalnetClient(s.TestingT.(*testing.T)) +// setupEventListeners sets up the event listeners for the relays suite. +// It listens to both tx and block events to keep track of the events that are happening +// onchain. +func (s *relaysSuite) setupEventListeners(rpcNode string) { + // Set up the blockClient that will be notifying the suite about the committed blocks. + eventsObs, eventsObsCh := channel.NewObservable[[]types.Event]() + s.committedEventsObs = eventsObs + + extractBlockEvents := func(ctx context.Context, block client.Block) { + // Query the block results endpoint for each observed block to get the tx and block events. + // Ref: https://docs.cometbft.com/main/rpc/#/Info/block_results + blockResultsUrl := fmt.Sprintf("%s/block_results?height=%d", rpcNode, block.Height()) + blockResultsResp, err := http.DefaultClient.Get(blockResultsUrl) + require.NoError(s, err) - deps := depinject.Supply(eventsQueryClient) - eventsReplayClient, err := events.NewEventsReplayClient( - s.ctx, - deps, - newTxEventSubscriptionQuery, - tx.UnmarshalTxResult, - eventsReplayClientBufferSize, - ) - require.NoError(s, err) + defer blockResultsResp.Body.Close() + + blockResultsRespBz, err := io.ReadAll(blockResultsResp.Body) + require.NoError(s, err) + + var rpcResponse rpctypes.RPCResponse + err = json.Unmarshal(blockResultsRespBz, &rpcResponse) + require.NoError(s, err) + + var blockResults cmtcoretypes.ResultBlockResults + err = json.Unmarshal(rpcResponse.Result, &blockResults) + require.NoError(s, err) - // Map the eventsReplayClient.EventsSequence which is a replay observable - // to a regular observable to avoid replaying txResults from old blocks. - s.newTxEventsObs = channel.Map( + numEvents := len(blockResults.TxsResults) + len(blockResults.FinalizeBlockEvents) + events := make([]types.Event, 0, numEvents) + + // Flatten all tx result events and block event results into one slice. + for _, txResult := range blockResults.TxsResults { + events = append(events, txResult.Events...) + } + + events = append(events, blockResults.FinalizeBlockEvents...) + + s.latestBlock = block + eventsObsCh <- events + } + + s.blockClient = testblock.NewLocalnetClient(s.ctx, s.TestingT.(*testing.T)) + channel.ForEach( s.ctx, - eventsReplayClient.EventsSequence(s.ctx), - func(ctx context.Context, txResult *types.TxResult) (*types.TxResult, bool) { - return txResult, false - }, + s.blockClient.CommittedBlocksSequence(s.ctx), + extractBlockEvents, ) } @@ -173,9 +204,9 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn( sessionInfo := &sessionInfoNotif{ blockHeight: blockHeight, - sessionNumber: testsession.GetSessionNumberWithDefaultParams(blockHeight), - sessionStartBlockHeight: testsession.GetSessionStartHeightWithDefaultParams(blockHeight), - sessionEndBlockHeight: testsession.GetSessionEndHeightWithDefaultParams(blockHeight), + sessionNumber: sharedtypes.GetSessionNumber(s.sharedParams, blockHeight), + sessionStartBlockHeight: sharedtypes.GetSessionStartHeight(s.sharedParams, blockHeight), + sessionEndBlockHeight: sharedtypes.GetSessionEndHeight(s.sharedParams, blockHeight), } infoLogger := logger.Info(). @@ -231,10 +262,12 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn( testProgressBlocksRelativeToTestStartHeight, s.relayLoadDurationBlocks, ) - if sessionInfo.blockHeight == sessionInfo.sessionEndBlockHeight { - newSessionsCount := len(s.activeApplications) * len(s.activeSuppliers) - s.expectedClaimsAndProofsCount = s.expectedClaimsAndProofsCount + newSessionsCount - } + logger.Info().Msgf( + "Relays sent: %d; Success: %d; Failed: %d", + s.numRelaysSent.Load(), + s.successfulRelays.Load(), + s.failedRelays.Load(), + ) // If the current block is the start of any new session, activate the prepared // actors to be used in the current session. @@ -457,16 +490,17 @@ func (s *relaysSuite) mapSessionInfoWhenStakingNewSuppliersAndGatewaysFn() chann // For each notification received, it waits for the new actors' staking/funding // txs to be committed before sending staking & delegation txs for new applications. func (s *relaysSuite) mapStakingInfoWhenStakingAndDelegatingNewApps( - _ context.Context, + ctx context.Context, notif *stakingInfoNotif, ) (*stakingInfoNotif, bool) { // Ensure that new gateways and suppliers are staked. // Ensure that new applications are funded and have an account entry on-chain // so that they can stake and delegate in the next block. - txResults := s.waitForTxsToBeCommitted() - s.ensureFundedActors(txResults, notif.newApps) - s.ensureStakedActors(txResults, EventActionMsgStakeGateway, notif.newGateways) - s.ensureStakedActors(txResults, EventActionMsgStakeSupplier, notif.newSuppliers) + testdelays.WaitAll( + func() { s.ensureStakedActors(ctx, notif.newSuppliers) }, + func() { s.ensureStakedActors(ctx, notif.newGateways) }, + func() { s.ensureFundedActors(ctx, notif.newApps) }, + ) // Update the list of staked suppliers. s.activeSuppliers = append(s.activeSuppliers, notif.newSuppliers...) @@ -627,11 +661,13 @@ func (s *relaysSuite) createApplicationAccount( // cost, and the block duration. func (s *relaysSuite) getAppFundingAmount(currentBlockHeight int64) sdk.Coin { currentTestDuration := s.testStartHeight + s.relayLoadDurationBlocks - currentBlockHeight + // Compute the cost of all relays throughout the test duration. + totalRelayCostDuringTestUPOKT := s.relayRatePerApp * s.relayCoinAmountCost * currentTestDuration * blockDurationSec // Multiply by 2 to make sure the application does not run out of funds // based on the number of relays it needs to send. Theoretically, `+1` should // be enough, but probabilistic and time based mechanisms make it hard // to predict exactly. - appFundingAmount := s.relayRatePerApp * s.relayCoinAmountCost * currentTestDuration * blockDuration * 2 + appFundingAmount := math.Max(totalRelayCostDuringTestUPOKT, s.appParams.MinStake.Amount.Int64()*2) return sdk.NewCoin("upokt", math.NewInt(appFundingAmount)) } @@ -724,7 +760,7 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementActorCount( return false } - initialSessionNumber := testsession.GetSessionNumberWithDefaultParams(startBlockHeight) + initialSessionNumber := sharedtypes.GetSessionNumber(sharedParams, startBlockHeight) actorSessionIncRate := plan.blocksPerIncrement / int64(sharedParams.GetNumBlocksPerSession()) nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber isSessionStartHeight := sessionInfo.blockHeight == sessionInfo.sessionStartBlockHeight @@ -750,7 +786,7 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementSupplierCount( return false } - initialSessionNumber := testsession.GetSessionNumberWithDefaultParams(startBlockHeight) + initialSessionNumber := sharedtypes.GetSessionNumber(sharedParams, startBlockHeight) supplierSessionIncRate := plan.blocksPerIncrement / int64(sharedParams.GetNumBlocksPerSession()) nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber isSessionEndHeight := sessionInfo.blockHeight == sessionInfo.sessionEndBlockHeight @@ -798,6 +834,9 @@ func (s *relaysSuite) addPendingStakeSupplierMsg(supplier *accountInfo) { RpcType: sharedtypes.RPCType_JSON_RPC, }, }, + RevShare: []*sharedtypes.ServiceRevenueShare{ + {Address: supplier.address, RevSharePercentage: 100}, + }, }, }, )) @@ -943,7 +982,9 @@ func (s *relaysSuite) sendPendingMsgsTx(actor *accountInfo) { err := txBuilder.SetMsgs(actor.pendingMsgs...) require.NoError(s, err) - txBuilder.SetTimeoutHeight(uint64(s.latestBlock.Height() + 1)) + // Set the transaction timeout height to 2 blocks beyond the current block height. + // This ensures the transaction won't be rejected if the next block commit is imminent. + txBuilder.SetTimeoutHeight(uint64(s.latestBlock.Height() + 2)) txBuilder.SetGasLimit(690000042) accAddress := sdk.MustAccAddressFromBech32(actor.address) @@ -973,33 +1014,6 @@ func (s *relaysSuite) sendPendingMsgsTx(actor *accountInfo) { }() } -// waitForTxsToBeCommitted waits for transactions to be observed on-chain. -// It is used to ensure that the transactions are committed before taking -// dependent actions. -func (s *relaysSuite) waitForTxsToBeCommitted() (txResults []*types.TxResult) { - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - - ch := s.newTxEventsObs.Subscribe(ctx).Ch() - for { - txResult := <-ch - txResults = append(txResults, txResult) - - // The number of transactions to be observed is not available in the TxResult - // event, so this number is taken from the last block event. - // The block received from s.latestBlock may be the previous one, it is - // necessary to wait until the block matching the txResult height is received - // in order to get the right number of transaction events to collect. - numTxs := s.waitUntilLatestBlockHeightEquals(txResult.Height) - - // If all transactions are observed, break the loop. - if len(txResults) == numTxs { - break - } - } - return txResults -} - // waitUntilLatestBlockHeightEquals blocks until s.latestBlock.Height() equals the targetHeight. // NB: s.latestBlock is updated asynchronously via a subscription to the block client observable. func (s *relaysSuite) waitUntilLatestBlockHeightEquals(targetHeight int64) int { @@ -1034,161 +1048,215 @@ func (s *relaysSuite) sendRelay(iteration uint64, relayPayload string) (appAddre gateway := s.activeGateways[iteration%uint64(len(s.activeGateways))] application := s.activeApplications[iteration%uint64(len(s.activeApplications))] - gatewayUrl, err := url.Parse(s.gatewayUrls[gateway.address]) - require.NoError(s, err) - - // Include the application address in the query to the gateway. - query := gatewayUrl.Query() - query.Add("applicationAddr", application.address) - query.Add("relayCount", fmt.Sprintf("%d", iteration)) - gatewayUrl.RawQuery = query.Encode() - - // Use the pre-defined service ID that all application and suppliers are staking for. - gatewayUrl.Path = testedServiceId - // TODO_MAINNET: Capture the relay response to check for failing relays. // Send the relay request within a goroutine to avoid blocking the test batches // when suppliers or gateways are unresponsive. - go func(gwURL, payload string) { - _, err = http.DefaultClient.Post( - gwURL, - "application/json", - strings.NewReader(payload), - ) + sendRelayRequest := func(gatewayURL, appAddr, payload string) { + req, err := http.NewRequest("POST", gatewayURL, strings.NewReader(payload)) + + // TODO_TECHDEBT(red-0ne): Use 'app-address' instead of 'X-App-Address' once PATH Gateway + // deprecates the X-App-Address header. + // This is needed by the PATH Gateway's trusted mode to identify the application + // that is sending the relay request. + req.Header.Add("X-App-Address", appAddr) + req.Header.Add("target-service-id", "anvil") + res, err := http.DefaultClient.Do(req) require.NoError(s, err) - }(gatewayUrl.String(), relayPayload) + + if res.StatusCode == http.StatusOK { + s.successfulRelays.Add(1) + } else { + s.failedRelays.Add(1) + } + } + + gatewayURL := s.gatewayUrls[gateway.address] + go sendRelayRequest(gatewayURL, application.address, relayPayload) return application.address, gateway.address } // ensureFundedActors checks if the actors are funded by observing the transfer events // in the transactions results. -func (s *relaysSuite) ensureFundedActors( - txResults []*types.TxResult, - actors []*accountInfo, -) { - for _, actor := range actors { - actorFunded := false - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-relevant events. - if event.Type != "transfer" { - continue - } - - attrs := event.Attributes - // Check if the actor is the recipient of the transfer event. - if actorFunded = hasEventAttr(attrs, "recipient", actor.address); actorFunded { - break - } +func (s *relaysSuite) ensureFundedActors(ctx context.Context, actors []*accountInfo) { + if len(actors) == 0 { + s.Logf("No actors to fund") + return + } + + fundedActors := make(map[string]struct{}) + actorsAddrs := make([]string, len(actors)) + for i, actor := range actors { + actorsAddrs[i] = actor.address + } + + // Add 1 second to the block duration to make sure the deadline is after the next block. + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + channel.ForEach(ctx, s.committedEventsObs, func(ctx context.Context, events []types.Event) { + for _, event := range events { + // In the context of ensuring the actors are funded, only the transfer events + // are relevant; filtering out the other events. + if event.GetType() != "transfer" { + continue + } + + attrs := event.GetAttributes() + // Check if the actor is the recipient of the transfer event. + fundedActorAddr, ok := getEventAttr(attrs, "recipient") + if !ok { + continue } - // If the actor is funded, no need to check the other transactions. - if actorFunded { - break + if !slices.Contains(actorsAddrs, fundedActorAddr) { + continue } + + fundedActors[fundedActorAddr] = struct{}{} } - // If no transfer event is found for the actor, the test is canceled. - if !actorFunded { - s.logAndAbortTest(txResults, "actor not funded") - return + // Cancel this scope once all expected actors are successfully funded before + // the deadline was reached. + if allActorsFunded(actors, fundedActors) { + cancel() } + }) + + <-ctx.Done() + if !allActorsFunded(actors, fundedActors) { + s.logAndAbortTest("at least one actor was not funded successfully") } } +// allActorsFunded checks if all the expected actors are funded. +// An error is returned if any (at least one) of the expected actors was not funded. +func allActorsFunded(expectedActors []*accountInfo, fundedActors map[string]struct{}) bool { + for _, actor := range expectedActors { + if _, ok := fundedActors[actor.address]; !ok { + return false + } + } + + return true +} + // ensureStakedActors checks if the actors are staked by observing the message events // in the transactions results. func (s *relaysSuite) ensureStakedActors( - txResults []*types.TxResult, - msg string, + ctx context.Context, actors []*accountInfo, ) { - for _, actor := range actors { - actorStaked := false - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-relevant events. - if event.Type != "message" { - continue - } - - attrs := event.Attributes - // Check if the actor is the sender of the message event. - if hasEventAttr(attrs, "action", msg) && hasEventAttr(attrs, "sender", actor.address) { - actorStaked = true - break - } - } + if len(actors) == 0 { + return + } - // If the actor is staked, no need to check the other transactions. - if actorStaked { - break + stakedActors := make(map[string]struct{}) + + // Add 1 second to the block duration to make sure the deadline is after the next block. + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) + channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) { + for _, event := range blockEvents { + switch e := event.(type) { + case *suppliertypes.EventSupplierStaked: + stakedActors[e.Supplier.GetOperatorAddress()] = struct{}{} + case *gatewaytypes.EventGatewayStaked: + stakedActors[e.Gateway.GetAddress()] = struct{}{} + case *apptypes.EventApplicationStaked: + stakedActors[e.Application.GetAddress()] = struct{}{} } } - // If no message event is found for the actor, log the transaction results - // and cancel the test. - if !actorStaked { - s.logAndAbortTest(txResults, fmt.Sprintf("actor not staked: %s", actor.address)) - return + // Cancel this scope once all expected actors are successfully staked before + // the deadline was reached. + if allActorsStaked(actors, stakedActors) { + cancel() + } + }) + + <-ctx.Done() + if !allActorsStaked(actors, stakedActors) { + s.logAndAbortTest("at least one actor was not staked successfully") + return + } +} + +// allActorsStaked checks if all the expected actors are staked. +// An error is returned if any of the expected actors was not staked. +func allActorsStaked(expectedActors []*accountInfo, stakedActors map[string]struct{}) bool { + for _, actor := range expectedActors { + if _, ok := stakedActors[actor.address]; !ok { + return false } } + + return true } // ensureDelegatedActors checks if the actors are delegated by observing the // delegation events in the transactions results. func (s *relaysSuite) ensureDelegatedApps( - txResults []*types.TxResult, + ctx context.Context, applications, gateways []*accountInfo, ) { - for _, application := range applications { - numDelegatees := 0 - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-EventDelegation events. - if event.Type != EventTypeRedelegation { - continue - } - - attrs := event.Attributes - appAddr := fmt.Sprintf("%q", application.address) - // Skip the event if the application is not the delegator. - if !hasEventAttr(attrs, "app_address", appAddr) { - break - } - - // Check if the application is delegated to each of the gateways. - for _, gateway := range gateways { - gwAddr := fmt.Sprintf("%q", gateway.address) - if hasEventAttr(attrs, "gateway_address", gwAddr) { - numDelegatees++ - break - } - } + if len(applications) == 0 || len(gateways) == 0 { + return + } + + appsToGateways := make(map[string][]string) + + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) + channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) { + for _, event := range blockEvents { + redelegationEvent, ok := event.(*apptypes.EventRedelegation) + if ok { + app := redelegationEvent.GetApplication() + appsToGateways[app.GetAddress()] = app.GetDelegateeGatewayAddresses() } } - // If the number of delegatees is not equal to the number of gateways, - // the test is canceled. - if numDelegatees != len(gateways) { - s.logAndAbortTest(txResults, "applications not delegated to all gateways") - return + // Cancel this scope once all expected applications are successfully delegated + // to the expected gateways before the deadline was reached. + if allAppsDelegatedToAllGateways(applications, gateways, appsToGateways) { + cancel() } + }) + + <-ctx.Done() + if !allAppsDelegatedToAllGateways(applications, gateways, appsToGateways) { + s.logAndAbortTest("applications not delegated to all gateways") + return } } +// allAppsDelegatedToAllGateways checks if all applications are delegated to all gateways. +func allAppsDelegatedToAllGateways( + applications, gateways []*accountInfo, + appsToGateways map[string][]string, +) bool { + for _, app := range applications { + if _, ok := appsToGateways[app.address]; !ok { + return false + } + + for _, gateway := range gateways { + if !slices.Contains(appsToGateways[app.address], gateway.address) { + return false + } + } + } + + return true +} + // getRelayCost fetches the relay cost from the tokenomics module. func (s *relaysSuite) getRelayCost() int64 { - // Set up the tokenomics client. - flagSet := testclient.NewLocalnetFlagSet(s) - clientCtx := testclient.NewLocalnetClientCtx(s, flagSet) - sharedClient := sharedtypes.NewQueryClient(clientCtx) - - res, err := sharedClient.Params(s.ctx, &sharedtypes.QueryParamsRequest{}) - require.NoError(s, err) + relayCost := s.testedService.ComputeUnitsPerRelay * s.sharedParams.ComputeUnitsToTokensMultiplier - return int64(res.Params.ComputeUnitsToTokensMultiplier) + return int64(relayCost) } // getProvisionedActorsCurrentStakedAmount fetches the current stake amount of @@ -1242,15 +1310,15 @@ func (s *relaysSuite) activatePreparedActors(notif *sessionInfoNotif) { } } -// hasEventAttr checks if the event attributes contain a given key-value pair. -func hasEventAttr(attributes []types.EventAttribute, key, value string) bool { +// getEventAttr returns the event attribute value corresponding to the provided key. +func getEventAttr(attributes []types.EventAttribute, key string) (value string, found bool) { for _, attribute := range attributes { - if attribute.Key == key && attribute.Value == value { - return true + if attribute.Key == key { + return attribute.Value, true } } - return false + return "", false } // sendAdjustMaxDelegationsParamTx sends a transaction to adjust the max_delegated_gateways @@ -1258,20 +1326,17 @@ func hasEventAttr(attributes []types.EventAttribute, key, value string) bool { func (s *relaysSuite) sendAdjustMaxDelegationsParamTx(maxGateways int64) { authority := authtypes.NewModuleAddress(govtypes.ModuleName).String() - appMsgUpdateParams := &apptypes.MsgUpdateParams{ + appMsgUpdateMaxDelegatedGatewaysParam := &apptypes.MsgUpdateParam{ Authority: authority, - Params: apptypes.Params{ - // Set the max_delegated_gateways parameter to the number of gateways - // that are currently used in the test. - MaxDelegatedGateways: uint64(maxGateways), - }, + Name: "max_delegated_gateways", + AsType: &apptypes.MsgUpdateParam_AsUint64{AsUint64: uint64(maxGateways)}, } - appMsgUpdateParamsAny, err := codectypes.NewAnyWithValue(appMsgUpdateParams) + appMsgUpdateParamAny, err := codectypes.NewAnyWithValue(appMsgUpdateMaxDelegatedGatewaysParam) require.NoError(s, err) authzExecMsg := &authz.MsgExec{ Grantee: s.fundingAccountInfo.address, - Msgs: []*codectypes.Any{appMsgUpdateParamsAny}, + Msgs: []*codectypes.Any{appMsgUpdateParamAny}, } s.fundingAccountInfo.addPendingMsg(authzExecMsg) @@ -1334,27 +1399,15 @@ func (s *relaysSuite) parseActorLoadTestIncrementPlans( return actorPlans } -// countClaimAndProofs asynchronously counts the number of claim and proof messages -// in the observed transaction events. -func (s *relaysSuite) countClaimAndProofs() { +// forEachSettlement asynchronously captures the settlement events and processes them. +func (s *relaysSuite) forEachSettlement(ctx context.Context) { + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) channel.ForEach( s.ctx, - s.newTxEventsObs, - func(ctx context.Context, txEvent *types.TxResult) { - for _, event := range txEvent.Result.Events { - if event.Type != "message" { - continue - } - - if hasEventAttr(event.Attributes, "action", EventActionMsgCreateClaim) { - s.currentClaimCount++ - } - - if hasEventAttr(event.Attributes, "action", EventActionMsgSubmitProof) { - s.currentProofCount++ - } - - } + typedEventsObs, + func(_ context.Context, _ []proto.Message) { + // TODO_FOLLOWUP(@red-0ne): Capture all settlement related events and use + // them to calculate the expected actor balances. }, ) } @@ -1379,19 +1432,107 @@ func (s *relaysSuite) querySharedParams(queryNodeRPCURL string) { s.sharedParams = sharedParams } +// queryAppParams queries the current on-chain application module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryAppParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + appQueryclient, err := query.NewApplicationQuerier(deps) + require.NoError(s, err) + + appParams, err := appQueryclient.GetParams(s.ctx) + require.NoError(s, err) + + s.appParams = appParams +} + +// queryProofParams queries the current on-chain proof module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryProofParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + proofQueryclient, err := query.NewProofQuerier(deps) + require.NoError(s, err) + + params, err := proofQueryclient.GetParams(s.ctx) + require.NoError(s, err) + + // The proofQueryclient#GetParams returns an Params interface to avoid a circular + // dependency between the proof module and the query module, so it needs to be casted + // to the actual prooftypes.Params type. + proofParams, ok := params.(*prooftypes.Params) + require.True(s, ok) + + s.proofParams = proofParams +} + +// queryTokenomicsParams queries the current on-chain tokenomics module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryTokenomicsParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + // TODO_TECHDEBT(red-0ne): Use tokenomics client querier instead of the grpc client + // once implemented. + var clientConn *grpc.ClientConn + err = depinject.Inject(deps, clientConn) + require.NoError(s, err) + + tokenomicsQuerier := tokenomicstypes.NewQueryClient(clientConn) + res, err := tokenomicsQuerier.Params(s.ctx, &tokenomicstypes.QueryParamsRequest{}) + require.NoError(s, err) + + s.tokenomicsParams = &res.Params +} + +// queryTestedService queries the current service being tested. +func (s *relaysSuite) queryTestedService(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + serviceQueryclient, err := query.NewServiceQuerier(deps) + require.NoError(s, err) + + service, err := serviceQueryclient.GetService(s.ctx, "anvil") + require.NoError(s, err) + + s.testedService = &service +} + // forEachStakedAndDelegatedAppPrepareApp is a ForEachFn that waits for txs which // were broadcast in previous pipeline stages have been committed. It ensures that // new applications were successfully staked and all application actors are delegated // to all gateways. Then it adds the new application actors to the prepared set, to // be activated & used in the next session. -func (s *relaysSuite) forEachStakedAndDelegatedAppPrepareApp(_ context.Context, notif *stakingInfoNotif) { - // Wait for the next block to commit staking and delegation transactions - // and be able to send relay requests evenly distributed across all gateways. - txResults := s.waitForTxsToBeCommitted() - s.ensureStakedActors(txResults, EventActionMsgStakeApplication, notif.newApps) - s.ensureDelegatedApps(txResults, s.activeApplications, notif.newGateways) - s.ensureDelegatedApps(txResults, notif.newApps, notif.newGateways) - s.ensureDelegatedApps(txResults, notif.newApps, s.activeGateways) +func (s *relaysSuite) forEachStakedAndDelegatedAppPrepareApp(ctx context.Context, notif *stakingInfoNotif) { + testdelays.WaitAll( + func() { s.ensureStakedActors(ctx, notif.newApps) }, + func() { s.ensureDelegatedApps(ctx, s.activeApplications, notif.newGateways) }, + func() { s.ensureDelegatedApps(ctx, notif.newApps, notif.newGateways) }, + func() { s.ensureDelegatedApps(ctx, notif.newApps, s.activeGateways) }, + ) // Add the new applications to the list of prepared applications to be activated in // the next session. @@ -1415,9 +1556,9 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn relayInterval := time.Second / time.Duration(relaysPerSec) batchWaitGroup := new(sync.WaitGroup) - batchWaitGroup.Add(relaysPerSec * int(blockDuration)) + batchWaitGroup.Add(relaysPerSec * int(blockDurationSec)) - for i := 0; i < relaysPerSec*int(blockDuration); i++ { + for i := 0; i < relaysPerSec*int(blockDurationSec); i++ { iterationTime := relayBatchInfo.nextBatchTime.Add(time.Duration(i+1) * relayInterval) batchLimiter.Go(s.ctx, func() { @@ -1453,17 +1594,12 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn batchWaitGroup.Wait() } -func (s *relaysSuite) logAndAbortTest(txResults []*types.TxResult, errorMsg string) { - for _, txResult := range txResults { - if txResult.Result.Log != "" { - logger.Error().Msgf("tx result log: %s", txResult.Result.Log) - } - } +func (s *relaysSuite) logAndAbortTest(errorMsg string) { s.cancelCtx() s.Fatal(errorMsg) } -// populateWithKnownApplications creates a list of gateways based on the gatewayUrls +// populateWithKnownGateways creates a list of gateways based on the gatewayUrls // provided in the test manifest. It is used in non-ephemeral chain tests where the // gateways are not under the test's control and are expected to be already staked. func (s *relaysSuite) populateWithKnownGateways() (gateways []*accountInfo) { diff --git a/load-testing/tests/relays_stress_single_suppier.feature b/load-testing/tests/relays_stress_single_supplier.feature similarity index 51% rename from load-testing/tests/relays_stress_single_suppier.feature rename to load-testing/tests/relays_stress_single_supplier.feature index 253b82615..34d51a6dd 100644 --- a/load-testing/tests/relays_stress_single_suppier.feature +++ b/load-testing/tests/relays_stress_single_supplier.feature @@ -11,7 +11,16 @@ Feature: Loading gateway server with relays And more actors are staked as follows: | actor | actor inc amount | blocks per inc | max actors | | application | 4 | 10 | 12 | - | gateway | 1 | 10 | 3 | + | gateway | 1 | 10 | 1 | | supplier | 1 | 10 | 1 | When a load of concurrent relay requests are sent from the applications - Then the correct pairs count of claim and proof messages should be committed on-chain \ No newline at end of file + Then the number of failed relay requests is "0" + # TODO_FOLLOWUP(@red-0ne): Implement the following steps + # Then "0" over servicing events are observed + # And "0" slashing events are observed + # And "0" expired claim events are observed + # And there are as many reimbursement requests as the number of settled claims + # And the number of claims submitted and claims settled is the same + # And the number of proofs submitted and proofs required is the same + # And the actors onchain balances are as expected + # TODO_CONSIDERATION: Revisit for additional interesting test cases. \ No newline at end of file diff --git a/load-testing/tests/relays_stress_test.go b/load-testing/tests/relays_stress_test.go index ae7f07a12..7a455f896 100644 --- a/load-testing/tests/relays_stress_test.go +++ b/load-testing/tests/relays_stress_test.go @@ -23,23 +23,11 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/testclient" - "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testtx" + apptypes "github.com/pokt-network/poktroll/x/application/types" + prooftypes "github.com/pokt-network/poktroll/x/proof/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" -) - -// The following constants are used to identify the different types of transactions, -// once committed, which are expected to be observed on-chain during the test. -// NB: The TxResult Events' #Type values are not prefixed with a slash, -// unlike TxResult Events' "action" attribute value. -const ( - EventActionMsgStakeApplication = "/poktroll.application.MsgStakeApplication" - EventActionMsgStakeGateway = "/poktroll.gateway.MsgStakeGateway" - EventActionMsgStakeSupplier = "/poktroll.supplier.MsgStakeSupplier" - EventActionMsgCreateClaim = "/poktroll.proof.MsgCreateClaim" - EventActionMsgSubmitProof = "/poktroll.proof.MsgSubmitProof" - EventActionAppMsgUpdateParams = "/poktroll.application.MsgUpdateParams" - EventTypeRedelegation = "poktroll.application.EventRedelegation" + tokenomicstypes "github.com/pokt-network/poktroll/x/tokenomics/types" ) // The following constants define the expected ordering of the actors when @@ -77,8 +65,6 @@ var ( // maxConcurrentRequestLimit is the maximum number of concurrent requests that can be made. // By default, it is set to the number of logical CPUs available to the process. maxConcurrentRequestLimit = runtime.GOMAXPROCS(0) - // fundingAccountAddress is the address of the account used to fund other accounts. - fundingAccountAddress string // supplierStakeAmount is the amount of tokens to stake by suppliers. supplierStakeAmount sdk.Coin // gatewayStakeAmount is the amount of tokens to stake by gateways. @@ -86,15 +72,9 @@ var ( // testedServiceId is the service ID for that all applications and suppliers will // be using in this test. testedServiceId string - // blockDuration is the duration of a block in seconds. + // blockDurationSec is the duration of a block in seconds. // NB: This value SHOULD be equal to `timeout_propose` in `config.yml`. - blockDuration = int64(2) - // newTxEventSubscriptionQuery is the format string which yields a subscription - // query to listen for on-chain Tx events. - newTxEventSubscriptionQuery = "tm.event='Tx'" - // eventsReplayClientBufferSize is the buffer size for the events replay client - // for the subscriptions above. - eventsReplayClientBufferSize = 100 + blockDurationSec = int64(2) // relayPayloadFmt is the JSON-RPC request relayPayloadFmt to send a relay request. relayPayloadFmt = `{"jsonrpc":"2.0","method":"%s","params":[],"id":%d}` // relayRequestMethod is the method of the JSON-RPC request to be relayed. @@ -127,15 +107,17 @@ type relaysSuite struct { // batchInfoObs is the observable mapping session information to batch information. // It is used to determine when to send a batch of relay requests to the network. batchInfoObs observable.Observable[*relayBatchInfoNotif] - // newTxEventsObs is the observable that notifies the test suite of new - // transactions committed on-chain. - // It is used to check the results of the transactions sent by the test suite. - newTxEventsObs observable.Observable[*types.TxResult] // txContext is the transaction context used to sign and send transactions. txContext client.TxContext - // sharedParams is the shared on-chain parameters used in the test. + + // Protocol governance params used in the test. // It is queried at the beginning of the test. - sharedParams *sharedtypes.Params + sharedParams *sharedtypes.Params + appParams *apptypes.Params + proofParams *prooftypes.Params + tokenomicsParams *tokenomicstypes.Params + + testedService *sharedtypes.Service // numRelaysSent is the number of relay requests sent during the test. numRelaysSent atomic.Uint64 @@ -213,17 +195,17 @@ type relaysSuite struct { // ready to handle relay requests. activeSuppliers []*accountInfo - // Number of claims and proofs observed on-chain during the test. - currentProofCount int - currentClaimCount int - - // expectedClaimsAndProofsCount is the expected number of claims and proofs - // to be committed on-chain during the test. - expectedClaimsAndProofsCount int - // isEphemeralChain is a flag that indicates whether the test is expected to be // run on ephemeral chain setups like localnet or long living ones (i.e. Test/DevNet). isEphemeralChain bool + + // committedEventsObs is the observable that maps committed blocks to on-chain events. + committedEventsObs observable.Observable[[]types.Event] + + // successfulRelays is the number of relay requests that returned 200 status code. + successfulRelays atomic.Uint64 + // failedRelays is the number of relay requests that returned non-200 status code. + failedRelays atomic.Uint64 } // accountInfo contains the account info needed to build and send transactions. @@ -270,8 +252,8 @@ func TestLoadRelays(t *testing.T) { gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress.feature")).Run() } -func TestLoadRelaysSingleSupplier(t *testing.T) { - gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress_single_suppier.feature")).Run() +func TestSingleSupplierLoadRelays(t *testing.T) { + gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress_single_supplier.feature")).Run() } func (s *relaysSuite) LocalnetIsRunning() { @@ -318,9 +300,9 @@ func (s *relaysSuite) LocalnetIsRunning() { // CometLocalWebsocketURL to the TestNetNode URL. These variables are used // by the testtx txClient to send transactions to the network. if !s.isEphemeralChain { - testclient.CometLocalTCPURL = loadTestParams.TestNetNode + testclient.CometLocalTCPURL = loadTestParams.RPCNode - webSocketURL, err := url.Parse(loadTestParams.TestNetNode) + webSocketURL, err := url.Parse(loadTestParams.RPCNode) require.NoError(s, err) // TestNet nodes may be exposed over HTTPS, so adjust the scheme accordingly. @@ -332,37 +314,31 @@ func (s *relaysSuite) LocalnetIsRunning() { testclient.CometLocalWebsocketURL = webSocketURL.String() + "/websocket" // Update the block duration when running the test on a non-ephemeral chain. - // TODO_TECHDEBT: Get the block duration value from the chain or the manifest. - blockDuration = 60 + // TODO_TECHDEBT: Get the block duration value from the chain. + blockDurationSec = 60 } - // Set up the blockClient that will be notifying the suite about the committed blocks. - s.blockClient = testblock.NewLocalnetClient(s.ctx, s.TestingT.(*testing.T)) - channel.ForEach( - s.ctx, - s.blockClient.CommittedBlocksSequence(s.ctx), - func(ctx context.Context, block client.Block) { - s.latestBlock = block - }, - ) - // Setup the txContext that will be used to send transactions to the network. s.txContext = testtx.NewLocalnetContext(s.TestingT.(*testing.T)) - // Get the relay cost from the tokenomics module. - s.relayCoinAmountCost = s.getRelayCost() - - // Setup the tx listener for on-chain events to check and assert on transactions results. - s.setupTxEventListeners() + // Setup the event listener for on-chain events to check and assert on transactions + // and finalized blocks results. + s.setupEventListeners(loadTestParams.RPCNode) // Initialize the funding account. s.initFundingAccount(loadTestParams.FundingAccountAddress) - // Initialize the on-chain claims and proofs counter. - s.countClaimAndProofs() + // Initialize the on-chain settlement events listener. + s.forEachSettlement(s.ctx) - // Query for the current shared on-chain params. - s.querySharedParams(loadTestParams.TestNetNode) + // Query for the current network on-chain params. + s.querySharedParams(loadTestParams.RPCNode) + s.queryAppParams(loadTestParams.RPCNode) + s.queryProofParams(loadTestParams.RPCNode) + s.queryTestedService(loadTestParams.RPCNode) + + // Get the relay cost from the tokenomics module. + s.relayCoinAmountCost = s.getRelayCost() // Some suppliers may already be staked at genesis, ensure that staking during // this test succeeds by increasing the sake amount. @@ -402,62 +378,50 @@ func (s *relaysSuite) MoreActorsAreStakedAsFollows(table gocuke.DataTable) { // increment the actor count to the maximum. s.relayLoadDurationBlocks = s.plans.maxActorBlocksToFinalIncrementEnd() - if s.isEphemeralChain { - // Adjust the max delegations parameter to the max gateways to permit all - // applications to delegate to all gateways. - // This is to ensure that requests are distributed evenly across all gateways - // at any given time. - s.sendAdjustMaxDelegationsParamTx(s.plans.gateways.maxActorCount) - s.waitForTxsToBeCommitted() - s.ensureUpdatedMaxDelegations(s.plans.gateways.maxActorCount) - } - // Fund all the provisioned suppliers and gateways since their addresses are // known and they are not created on the fly, while funding only the initially // created applications. fundedSuppliers, fundedGateways, fundedApplications := s.sendFundAvailableActorsTx() // Funding messages are sent in a single transaction by the funding account, // only one transaction is expected to be committed. - txResults := s.waitForTxsToBeCommitted() - s.ensureFundedActors(txResults, fundedSuppliers) - s.ensureFundedActors(txResults, fundedGateways) - s.ensureFundedActors(txResults, fundedApplications) + fundedActors := append(fundedSuppliers, fundedGateways...) + fundedActors = append(fundedActors, fundedApplications...) + s.ensureFundedActors(s.ctx, fundedActors) logger.Info().Msg("Actors funded") // The initial actors are the first actors to stake. - suppliers := fundedSuppliers[:s.supplierInitialCount] - gateways := fundedGateways[:s.gatewayInitialCount] - applications := fundedApplications[:s.appInitialCount] + stakedSuppliers := fundedSuppliers[:s.supplierInitialCount] + stakedGateways := fundedGateways[:s.gatewayInitialCount] + stakedApplications := fundedApplications[:s.appInitialCount] + + stakedActors := append(stakedSuppliers, stakedGateways...) + stakedActors = append(stakedActors, stakedApplications...) - s.sendInitialActorsStakeMsgs(suppliers, gateways, applications) - txResults = s.waitForTxsToBeCommitted() - s.ensureStakedActors(txResults, EventActionMsgStakeSupplier, suppliers) - s.ensureStakedActors(txResults, EventActionMsgStakeGateway, gateways) - s.ensureStakedActors(txResults, EventActionMsgStakeApplication, applications) + s.sendInitialActorsStakeMsgs(stakedSuppliers, stakedGateways, stakedApplications) + s.ensureStakedActors(s.ctx, stakedActors) logger.Info().Msg("Actors staked") // Update the list of staked suppliers. - s.activeSuppliers = append(s.activeSuppliers, suppliers...) + s.activeSuppliers = append(s.activeSuppliers, stakedSuppliers...) // In the case of non-ephemeral chain load tests, the available gateways are // not incrementally staked, but are already staked and delegated to, add all // of them to the list of active gateways at the beginning of the test. if !s.isEphemeralChain { - gateways = s.populateWithKnownGateways() + stakedGateways = s.populateWithKnownGateways() } // Delegate the initial applications to the initial gateways - s.sendDelegateInitialAppsTxs(applications, gateways) - txResults = s.waitForTxsToBeCommitted() - s.ensureDelegatedApps(txResults, applications, gateways) + s.sendDelegateInitialAppsTxs(stakedApplications, stakedGateways) + s.ensureDelegatedApps(s.ctx, stakedApplications, stakedGateways) logger.Info().Msg("Apps delegated") // Applications and gateways are now ready and will be active in the next session. - s.preparedApplications = append(s.preparedApplications, applications...) - s.preparedGateways = append(s.preparedGateways, gateways...) + s.preparedApplications = append(s.preparedApplications, stakedApplications...) + s.preparedGateways = append(s.preparedGateways, stakedGateways...) // relayBatchInfoObs maps session information to batch information used to schedule // the relay requests to be sent on the current block. @@ -508,29 +472,11 @@ func (s *relaysSuite) ALoadOfConcurrentRelayRequestsAreSentFromTheApplications() // Block the feature step until the test is done. <-s.ctx.Done() } -func (s *relaysSuite) TheCorrectPairsCountOfClaimAndProofMessagesShouldBeCommittedOnchain() { - logger.Info(). - Int("claims", s.currentClaimCount). - Int("proofs", s.currentProofCount). - Msg("Claims and proofs count") - - require.Equal(s, - s.currentClaimCount, - s.currentProofCount, - "claims and proofs count mismatch", - ) - // TODO_TECHDEBT: The current counting mechanism for the expected claims and proofs - // is not accurate. The expected claims and proofs count should be calculated based - // on a function of(time_per_block, num_blocks_per_session) -> num_claims_and_proofs. - // The reason (time_per_block) is one of the parameters is because claims and proofs - // are removed from the on-chain state after sessions are settled, only leaving - // events behind. The final solution needs to either account for this timing - // carefully (based on sessions that have passed), or be event driven using - // a replay client of on-chain messages and/or events. - //require.Equal(s, - // s.expectedClaimsAndProofsCount, - // s.currentProofCount, - // "unexpected claims and proofs count", - //) +func (s *relaysSuite) TheNumberOfFailedRelayRequestsIs(expectedFailedRelays string) { + expectedFailedRelaysCount, err := strconv.ParseUint(expectedFailedRelays, 10, 64) + require.NoError(s, err) + + require.EqualValues(s, expectedFailedRelaysCount, s.failedRelays.Load()) + require.EqualValues(s, s.numRelaysSent.Load(), s.successfulRelays.Load()) } diff --git a/makefiles/tests.mk b/makefiles/tests.mk index 7bb292e3d..b1962c845 100644 --- a/makefiles/tests.mk +++ b/makefiles/tests.mk @@ -53,13 +53,13 @@ test_load_relays_stress_custom: ## Run the stress test for E2E relays using cust .PHONY: test_load_relays_stress_localnet test_load_relays_stress_localnet: test_e2e_env warn_message_local_stress_test ## Run the stress test for E2E relays on LocalNet. go test -v -count=1 ./load-testing/tests/... \ - -tags=load,test -run LoadRelays --log-level=debug --timeout=30m \ + -tags=load,test -run TestLoadRelays --log-level=debug --timeout=30m \ --manifest ./load-testing/loadtest_manifest_localnet.yaml .PHONY: test_load_relays_stress_localnet_single_supplier test_load_relays_stress_localnet_single_supplier: test_e2e_env warn_message_local_stress_test ## Run the stress test for E2E relays on LocalNet using exclusively one supplier. go test -v -count=1 ./load-testing/tests/... \ - -tags=load,test -run TestLoadRelaysSingleSupplier --log-level=debug --timeout=30m \ + -tags=load,test -run TestSingleSupplierLoadRelays --log-level=debug --timeout=30m \ --manifest ./load-testing/loadtest_manifest_localnet_single_supplier.yaml .PHONY: test_verbose diff --git a/pkg/client/interface.go b/pkg/client/interface.go index f98c39fef..953b6b47a 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -279,6 +279,9 @@ type ApplicationQueryClient interface { // GetAllApplications queries all on-chain applications GetAllApplications(ctx context.Context) ([]apptypes.Application, error) + + // GetParams queries the chain for the application module parameters. + GetParams(ctx context.Context) (*apptypes.Params, error) } // SupplierQueryClient defines an interface that enables the querying of the @@ -349,7 +352,7 @@ type ProofParams interface { // ProofQueryClient defines an interface that enables the querying of the // on-chain proof module params. type ProofQueryClient interface { - // GetParams queries the chain for the current shared module parameters. + // GetParams queries the chain for the current proof module parameters. GetParams(ctx context.Context) (ProofParams, error) } diff --git a/pkg/client/query/appquerier.go b/pkg/client/query/appquerier.go index 9477c35f9..9150af87c 100644 --- a/pkg/client/query/appquerier.go +++ b/pkg/client/query/appquerier.go @@ -62,3 +62,13 @@ func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Applic } return res.Applications, nil } + +// GetParams returns the application module parameters +func (aq *appQuerier) GetParams(ctx context.Context) (*apptypes.Params, error) { + req := apptypes.QueryParamsRequest{} + res, err := aq.applicationQuerier.Params(ctx, &req) + if err != nil { + return nil, err + } + return &res.Params, nil +} diff --git a/testutil/delays/waitall.go b/testutil/delays/waitall.go new file mode 100644 index 000000000..30ae68287 --- /dev/null +++ b/testutil/delays/waitall.go @@ -0,0 +1,23 @@ +package testdelays + +import "sync" + +// WaitAll waits for all the provided functions to complete. +// It is used to wait for multiple goroutines to complete before proceeding. +func WaitAll(waitFuncs ...func()) { + if len(waitFuncs) == 0 { + return + } + + var wg sync.WaitGroup + wg.Add(len(waitFuncs)) + + for _, fn := range waitFuncs { + go func(f func()) { + defer wg.Done() + f() + }(fn) + } + + wg.Wait() +} diff --git a/testutil/events/filter.go b/testutil/events/filter.go index ced4617af..5f16aa1ea 100644 --- a/testutil/events/filter.go +++ b/testutil/events/filter.go @@ -1,6 +1,7 @@ package events import ( + "context" "strconv" "strings" "testing" @@ -8,6 +9,8 @@ import ( abci "github.com/cometbft/cometbft/abci/types" cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/stretchr/testify/require" ) @@ -74,3 +77,24 @@ func NewEventTypeMatchFn(matchEventType string) func(*cosmostypes.Event) bool { return strings.Trim(event.Type, "/") == strings.Trim(matchEventType, "/") } } + +// AbciEventsToTypedEvents converts the abci events to typed events. +func AbciEventsToTypedEvents( + ctx context.Context, + abciEventObs observable.Observable[[]abci.Event], +) observable.Observable[[]proto.Message] { + return channel.Map(ctx, abciEventObs, func(ctx context.Context, events []abci.Event) ([]proto.Message, bool) { + var typedEvents []proto.Message + for _, event := range events { + // TODO_TECHDEBT: Filter out events by event.Type before parsing them. + typedEvent, err := cosmostypes.ParseTypedEvent(event) + if err != nil { + continue + } + + typedEvents = append(typedEvents, typedEvent) + } + + return typedEvents, false + }) +} diff --git a/testutil/keeper/tokenomics.go b/testutil/keeper/tokenomics.go index e4cef7328..409a92f4e 100644 --- a/testutil/keeper/tokenomics.go +++ b/testutil/keeper/tokenomics.go @@ -182,6 +182,11 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) ( Return(nil). AnyTimes() + mockApplicationKeeper.EXPECT(). + GetParams(gomock.Any()). + Return(apptypes.Params{}). + AnyTimes() + // Mock the supplier keeper. mockSupplierKeeper := mocks.NewMockSupplierKeeper(ctrl) // Mock SetSupplier. diff --git a/x/proof/types/application_query_client.go b/x/proof/types/application_query_client.go index 1cd887314..c9599f2df 100644 --- a/x/proof/types/application_query_client.go +++ b/x/proof/types/application_query_client.go @@ -42,3 +42,9 @@ func (appQueryClient *AppKeeperQueryClient) GetApplication( func (appQueryClient *AppKeeperQueryClient) GetAllApplications(ctx context.Context) ([]apptypes.Application, error) { return appQueryClient.keeper.GetAllApplications(ctx), nil } + +// GetParams returns the application module parameters. +func (appQueryClient *AppKeeperQueryClient) GetParams(ctx context.Context) (*apptypes.Params, error) { + params := appQueryClient.keeper.GetParams(ctx) + return ¶ms, nil +} diff --git a/x/proof/types/expected_keepers.go b/x/proof/types/expected_keepers.go index 9d1fd765e..e2981ce1a 100644 --- a/x/proof/types/expected_keepers.go +++ b/x/proof/types/expected_keepers.go @@ -49,6 +49,7 @@ type ApplicationKeeper interface { GetApplication(ctx context.Context, address string) (app apptypes.Application, found bool) GetAllApplications(ctx context.Context) []apptypes.Application SetApplication(context.Context, apptypes.Application) + GetParams(ctx context.Context) (params apptypes.Params) } // SharedKeeper defines the expected interface needed to retrieve shared information. diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index fa1995de2..6ff10b702 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -233,6 +233,7 @@ func (k Keeper) SettlePendingClaims(ctx cosmostypes.Context) ( NumEstimatedComputeUnits: numEstimatedComputeUnits, ClaimedUpokt: &claimeduPOKT, ProofRequirement: proofRequirement, + SettlementResult: *ClaimSettlementResult, } if err = ctx.EventManager().EmitTypedEvent(&claimSettledEvent); err != nil { diff --git a/x/tokenomics/types/expected_keepers.go b/x/tokenomics/types/expected_keepers.go index c7c0de0c5..9d555f4e2 100644 --- a/x/tokenomics/types/expected_keepers.go +++ b/x/tokenomics/types/expected_keepers.go @@ -46,6 +46,7 @@ type ApplicationKeeper interface { GetAllApplications(ctx context.Context) []apptypes.Application UnbondApplication(ctx context.Context, app *apptypes.Application) error EndBlockerUnbondApplications(ctx context.Context) error + GetParams(ctx context.Context) (params apptypes.Params) } type ProofKeeper interface {