From b149e303dfe4c693758f2a605041b0b6858345d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Fri, 15 Dec 2023 12:41:11 +0100 Subject: [PATCH] app: interact with API more often (#44) * app: interact with API more often Instead of waiting that *all* partial exits have been calculated before posting them to API, do that whenever we have a chance to do that. Also, download and process full exits on the epoch boundary instead of waiting that the previous process has completed. * cleanup: make sure to flush out all exits when no more keys need to be processed and we're at the epoch boundary * cleanup --- app/app.go | 135 ++++++++++++++++++------------- app/app_internal_test.go | 2 +- app/app_test.go | 85 ++++++++++++++++++- app/util/testutil/api_servers.go | 18 ++++- 4 files changed, 177 insertions(+), 63 deletions(-) diff --git a/app/app.go b/app/app.go index 94ad766..c6e1edf 100644 --- a/app/app.go +++ b/app/app.go @@ -20,9 +20,9 @@ import ( "github.com/jonboulle/clockwork" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" - "github.com/obolnetwork/charon/app/forkjoin" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" + manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1" "github.com/obolnetwork/charon/eth2util/signing" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/tbls" @@ -46,6 +46,7 @@ type Config struct { const ( maxBeaconNodeTimeout = 10 * time.Second + obolAPITimeout = 10 * time.Second ) // Run runs the lido-dv-exit core logic. @@ -109,9 +110,28 @@ func Run(ctx context.Context, config Config) error { oAPI := obolapi.Client{ObolAPIUrl: config.ObolAPIURL} var signedExits []obolapi.ExitBlob + fetchedSignedExits := map[string]struct{}{} for slot := range slotTicker { if len(valsKeys) == 0 { + // if we don't have any more keys to process, make sure we fetched *all* full exits before + // exiting this loop + if len(fetchedSignedExits) != len(signedExits) { + writeAllFullExits( + ctx, + bnClient, + oAPI, + cl, + signedExits, + fetchedSignedExits, + config.EjectorExitPath, + shareIdx, + identityKey, + ) + + continue + } + break // we finished signing everything we had to sign } @@ -120,6 +140,11 @@ func Run(ctx context.Context, config Config) error { continue } + log.Info(ctx, "Signing exit for available validators...", z.Int("available_validators", len(valsKeys))) + + // signedExitsInRound holds the signed partial exits generated at this epoch's lifecycle round + var signedExitsInRound []obolapi.ExitBlob + phase0Vals, err := valsKeys.ValidatorsPhase0() if err != nil { return errors.Wrap(err, "validator keys to phase0") @@ -168,75 +193,76 @@ func Run(ctx context.Context, config Config) error { continue } - log.Debug(ctx, "Signed exit") - signedExits = append(signedExits, obolapi.ExitBlob{ + log.Info(ctx, "Signed partial exit") + signedExitsInRound = append(signedExitsInRound, obolapi.ExitBlob{ PublicKey: validatorPubkStr, SignedExitMessage: exit, }) - - delete(valsKeys, keystore.ValidatorPubkey(validatorPubkStr)) } - } - tick := time.NewTicker(1 * time.Second) + if len(signedExitsInRound) != 0 { + // try posting the partial exits that have been produced at this stage + if err := postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExitsInRound...); err != nil { + log.Error(ctx, "Cannot post exits to obol api, will retry later", err) + } else { + for _, signedExit := range signedExitsInRound { + delete(valsKeys, keystore.ValidatorPubkey(signedExit.PublicKey)) + } + } - // send signed exit to obol api - for range tick.C { - // we're retrying every second until we succeed - if postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExits...) { - tick.Stop() - break + signedExits = append(signedExits, signedExitsInRound...) } - } - type fetchExitData struct { - lockHash []byte - validatorPubkey string - shareIndex uint64 - identityKey *k1.PrivateKey + writeAllFullExits( + ctx, + bnClient, + oAPI, + cl, + signedExits, + fetchedSignedExits, + config.EjectorExitPath, + shareIdx, + identityKey, + ) } - fork, join, fjcancel := forkjoin.New(ctx, func(ctx context.Context, data fetchExitData) (struct{}, error) { - tick := time.NewTicker(1 * time.Second) - defer tick.Stop() + log.Info(ctx, "Successfully fetched exit messages!") - exitFSPath := filepath.Join(config.EjectorExitPath, fmt.Sprintf("validator-exit-%s.json", data.validatorPubkey)) + return nil +} - for range tick.C { - if fetchFullExit(ctx, bnClient, oAPI, data.lockHash, data.validatorPubkey, exitFSPath, data.shareIndex, data.identityKey) { - break - } +// writeAllFullExits fetches and writes signedExits to disk. +func writeAllFullExits( + ctx context.Context, + eth2Cl eth2wrap.Client, + oAPI obolapi.Client, + cl *manifestpb.Cluster, + signedExits []obolapi.ExitBlob, + alreadySignedExits map[string]struct{}, + ejectorExitPath string, + shareIndex uint64, + identityKey *k1.PrivateKey, +) { + for _, signedExit := range signedExits { + if _, ok := alreadySignedExits[signedExit.PublicKey]; ok { + continue // bypass already-fetched full exit } - return struct{}{}, nil - }, forkjoin.WithWorkers(10), forkjoin.WithWaitOnCancel()) + exitFSPath := filepath.Join(ejectorExitPath, fmt.Sprintf("validator-exit-%s.json", signedExit.PublicKey)) - defer fjcancel() - - for _, se := range signedExits { - fork(fetchExitData{ - lockHash: cl.InitialMutationHash, - validatorPubkey: se.PublicKey, - shareIndex: shareIdx, - identityKey: identityKey, - }) - } - - _, err = join().Flatten() + if !fetchFullExit(ctx, eth2Cl, oAPI, cl.InitialMutationHash, signedExit.PublicKey, exitFSPath, shareIndex, identityKey) { + log.Debug(ctx, "Could not fetch full exit for validator", z.Str("validator", signedExit.PublicKey)) + continue + } - if err != nil && !errors.Is(err, context.Canceled) { - return errors.Wrap(err, "fatal error while processing full exits from obol api, please get in contact with the development team as soon as possible, with a full log of the execution") + alreadySignedExits[signedExit.PublicKey] = struct{}{} } - - log.Info(ctx, "Successfully fetched exit messages!") - - return nil } // fetchFullExit returns true if a full exit was received from the Obol API, and was written in exitFSPath. -// Each HTTP request has a 10 seconds timeout. +// Each HTTP request has a default timeout. func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Client, lockHash []byte, validatorPubkey, exitFSPath string, shareIndex uint64, identityKey *k1.PrivateKey) bool { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, obolAPITimeout) defer cancel() fullExit, err := oAPI.GetFullExit(ctx, validatorPubkey, lockHash, shareIndex, identityKey) @@ -300,17 +326,16 @@ func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Cli return true } -func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) bool { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) +// postPartialExit posts exitBlobs to Obol API with a default HTTP request timeout. +func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) error { + ctx, cancel := context.WithTimeout(ctx, obolAPITimeout) defer cancel() - // we're retrying every second until we succeed if err := oAPI.PostPartialExit(ctx, mutationHash, shareIndex, identityKey, exitBlobs...); err != nil { - log.Error(ctx, "Cannot post exits to obol api", err) - return false + return errors.Wrap(err, "cannot post partial exit") } - return true + return nil } // shouldProcessValidator returns true if a validator needs to be processed, meaning a full exit message must diff --git a/app/app_internal_test.go b/app/app_internal_test.go index 6c4c396..d99d6cf 100644 --- a/app/app_internal_test.go +++ b/app/app_internal_test.go @@ -72,7 +72,7 @@ func Test_newSlotTicker(t *testing.T) { cluster.WithVersion("v1.7.0"), ) - srvs := ldetestutil.APIServers(t, lock) + srvs := ldetestutil.APIServers(t, lock, false) defer srvs.Close() clock := clockwork.NewFakeClock() diff --git a/app/app_test.go b/app/app_test.go index a83773c..080fd67 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "testing" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" @@ -43,7 +44,7 @@ func Test_NormalFlow(t *testing.T) { cluster.WithVersion("v1.7.0"), ) - srvs := testutil.APIServers(t, lock) + srvs := testutil.APIServers(t, lock, false) defer srvs.Close() run(t, @@ -53,6 +54,35 @@ func Test_NormalFlow(t *testing.T) { keyShares, true, srvs, + false, + ) +} + +func Test_WithNonActiveVals(t *testing.T) { + valAmt := 100 + operatorAmt := 4 + + lock, enrs, keyShares := cluster.NewForT( + t, + valAmt, + operatorAmt, + operatorAmt, + 0, + cluster.WithVersion("v1.7.0"), + ) + + srvs := testutil.APIServers(t, lock, true) + defer srvs.Close() + + td := t.TempDir() + run(t, + td, + lock, + enrs, + keyShares, + true, + srvs, + true, ) } @@ -69,7 +99,7 @@ func Test_RunTwice(t *testing.T) { cluster.WithVersion("v1.7.0"), ) - srvs := testutil.APIServers(t, lock) + srvs := testutil.APIServers(t, lock, false) defer srvs.Close() root := t.TempDir() @@ -81,6 +111,7 @@ func Test_RunTwice(t *testing.T) { keyShares, true, srvs, + false, ) // delete half exits from each ejector directory @@ -104,6 +135,7 @@ func Test_RunTwice(t *testing.T) { keyShares, false, srvs, + false, ) } @@ -115,6 +147,7 @@ func run( keyShares [][]tbls.PrivateKey, createDirFiles bool, servers testutil.TestServers, + withNonActiveVals bool, ) { t.Helper() @@ -177,7 +210,8 @@ func run( eg := errgroup.Group{} - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for opIdx := 0; opIdx < operatorAmt; opIdx++ { opIdx := opIdx @@ -190,7 +224,50 @@ func run( }) } - require.NoError(t, eg.Wait()) + egErrorChan := make(chan error) + halfExitsErrorChan := make(chan error) + + go func() { + egErrorChan <- eg.Wait() + }() + + if withNonActiveVals { + // when withNonActiveVals is true, it means that we'll only produce half of the + // full exits. + go func() { + stop := false + + for !stop { + for opIdx := 0; opIdx < len(enrs); opIdx++ { + opID := fmt.Sprintf("op%d", opIdx) + + ejectorDir := filepath.Join(ejectorDir, opID) + files, err := os.ReadDir(ejectorDir) + require.NoError(t, err) + + if len(files) >= len(keyShares[opIdx])/2 { + cancel() // stop everything, test's alright + halfExitsErrorChan <- nil + stop = true + } + } + runtime.Gosched() // yield a little + } + }() + } + + stop := false + for !stop { + select { + case err := <-egErrorChan: + require.NoError(t, err) + stop = true + + case err := <-halfExitsErrorChan: + require.NoError(t, err) + return + } + } mockEth2Cl := servers.Eth2Client(t, context.Background()) diff --git a/app/util/testutil/api_servers.go b/app/util/testutil/api_servers.go index 62ba873..bc2e4fc 100644 --- a/app/util/testutil/api_servers.go +++ b/app/util/testutil/api_servers.go @@ -53,7 +53,7 @@ func (ts *TestServers) Eth2Client(t *testing.T, ctx context.Context) eth2wrap.Cl } // APIServers return an instance of TestServer with mocked Obol API and beacon node API from a given lock file. -func APIServers(t *testing.T, lock cluster.Lock) TestServers { +func APIServers(t *testing.T, lock cluster.Lock, withNonActiveVals bool) TestServers { t.Helper() oapiHandler, oapiAddLock := obolapi.MockServer() @@ -63,11 +63,23 @@ func APIServers(t *testing.T, lock cluster.Lock) TestServers { mockValidators := map[string]eth2v1.Validator{} - for _, val := range lock.Validators { + mightBeInactive := func(withNonActiveVals bool, idx int) eth2v1.ValidatorState { + if !withNonActiveVals { + return eth2v1.ValidatorStateActiveOngoing + } + + if idx%2 == 0 { + return eth2v1.ValidatorStateActiveOngoing + } + + return eth2v1.ValidatorStatePendingQueued // return a state which doesn't represent "validator is running" + } + + for idx, val := range lock.Validators { mockValidators[val.PublicKeyHex()] = eth2v1.Validator{ Index: eth2p0.ValidatorIndex(rand.Int63()), //nolint:gosec // testing function Balance: 42, - Status: eth2v1.ValidatorStateActiveOngoing, + Status: mightBeInactive(withNonActiveVals, idx), Validator: ð2p0.Validator{ PublicKey: eth2p0.BLSPubKey(val.PubKey), WithdrawalCredentials: testutil.RandomBytes32(),