Skip to content

Commit

Permalink
app: interact with API more often (#44)
Browse files Browse the repository at this point in the history
* app: interact with API more often

Instead of waiting that *all* partial exits have been calculated before posting them to API, do that whenever we have a chance to do that.

Also, download and process full exits on the epoch boundary instead of waiting that the previous process has completed.

* cleanup: make sure to flush out all exits when no more keys need to be processed and we're at the epoch boundary

* cleanup
  • Loading branch information
gsora authored Dec 15, 2023
1 parent 990beba commit b149e30
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 63 deletions.
135 changes: 80 additions & 55 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/jonboulle/clockwork"
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/forkjoin"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1"
"github.com/obolnetwork/charon/eth2util/signing"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/tbls"
Expand All @@ -46,6 +46,7 @@ type Config struct {

const (
maxBeaconNodeTimeout = 10 * time.Second
obolAPITimeout = 10 * time.Second
)

// Run runs the lido-dv-exit core logic.
Expand Down Expand Up @@ -109,9 +110,28 @@ func Run(ctx context.Context, config Config) error {
oAPI := obolapi.Client{ObolAPIUrl: config.ObolAPIURL}

var signedExits []obolapi.ExitBlob
fetchedSignedExits := map[string]struct{}{}

for slot := range slotTicker {
if len(valsKeys) == 0 {
// if we don't have any more keys to process, make sure we fetched *all* full exits before
// exiting this loop
if len(fetchedSignedExits) != len(signedExits) {
writeAllFullExits(
ctx,
bnClient,
oAPI,
cl,
signedExits,
fetchedSignedExits,
config.EjectorExitPath,
shareIdx,
identityKey,
)

continue
}

break // we finished signing everything we had to sign
}

Expand All @@ -120,6 +140,11 @@ func Run(ctx context.Context, config Config) error {
continue
}

log.Info(ctx, "Signing exit for available validators...", z.Int("available_validators", len(valsKeys)))

// signedExitsInRound holds the signed partial exits generated at this epoch's lifecycle round
var signedExitsInRound []obolapi.ExitBlob

phase0Vals, err := valsKeys.ValidatorsPhase0()
if err != nil {
return errors.Wrap(err, "validator keys to phase0")
Expand Down Expand Up @@ -168,75 +193,76 @@ func Run(ctx context.Context, config Config) error {
continue
}

log.Debug(ctx, "Signed exit")
signedExits = append(signedExits, obolapi.ExitBlob{
log.Info(ctx, "Signed partial exit")
signedExitsInRound = append(signedExitsInRound, obolapi.ExitBlob{
PublicKey: validatorPubkStr,
SignedExitMessage: exit,
})

delete(valsKeys, keystore.ValidatorPubkey(validatorPubkStr))
}
}

tick := time.NewTicker(1 * time.Second)
if len(signedExitsInRound) != 0 {
// try posting the partial exits that have been produced at this stage
if err := postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExitsInRound...); err != nil {
log.Error(ctx, "Cannot post exits to obol api, will retry later", err)
} else {
for _, signedExit := range signedExitsInRound {
delete(valsKeys, keystore.ValidatorPubkey(signedExit.PublicKey))
}
}

// send signed exit to obol api
for range tick.C {
// we're retrying every second until we succeed
if postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExits...) {
tick.Stop()
break
signedExits = append(signedExits, signedExitsInRound...)
}
}

type fetchExitData struct {
lockHash []byte
validatorPubkey string
shareIndex uint64
identityKey *k1.PrivateKey
writeAllFullExits(
ctx,
bnClient,
oAPI,
cl,
signedExits,
fetchedSignedExits,
config.EjectorExitPath,
shareIdx,
identityKey,
)
}

fork, join, fjcancel := forkjoin.New(ctx, func(ctx context.Context, data fetchExitData) (struct{}, error) {
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
log.Info(ctx, "Successfully fetched exit messages!")

exitFSPath := filepath.Join(config.EjectorExitPath, fmt.Sprintf("validator-exit-%s.json", data.validatorPubkey))
return nil
}

for range tick.C {
if fetchFullExit(ctx, bnClient, oAPI, data.lockHash, data.validatorPubkey, exitFSPath, data.shareIndex, data.identityKey) {
break
}
// writeAllFullExits fetches and writes signedExits to disk.
func writeAllFullExits(
ctx context.Context,
eth2Cl eth2wrap.Client,
oAPI obolapi.Client,
cl *manifestpb.Cluster,
signedExits []obolapi.ExitBlob,
alreadySignedExits map[string]struct{},
ejectorExitPath string,
shareIndex uint64,
identityKey *k1.PrivateKey,
) {
for _, signedExit := range signedExits {
if _, ok := alreadySignedExits[signedExit.PublicKey]; ok {
continue // bypass already-fetched full exit
}

return struct{}{}, nil
}, forkjoin.WithWorkers(10), forkjoin.WithWaitOnCancel())
exitFSPath := filepath.Join(ejectorExitPath, fmt.Sprintf("validator-exit-%s.json", signedExit.PublicKey))

defer fjcancel()

for _, se := range signedExits {
fork(fetchExitData{
lockHash: cl.InitialMutationHash,
validatorPubkey: se.PublicKey,
shareIndex: shareIdx,
identityKey: identityKey,
})
}

_, err = join().Flatten()
if !fetchFullExit(ctx, eth2Cl, oAPI, cl.InitialMutationHash, signedExit.PublicKey, exitFSPath, shareIndex, identityKey) {
log.Debug(ctx, "Could not fetch full exit for validator", z.Str("validator", signedExit.PublicKey))
continue
}

if err != nil && !errors.Is(err, context.Canceled) {
return errors.Wrap(err, "fatal error while processing full exits from obol api, please get in contact with the development team as soon as possible, with a full log of the execution")
alreadySignedExits[signedExit.PublicKey] = struct{}{}
}

log.Info(ctx, "Successfully fetched exit messages!")

return nil
}

// fetchFullExit returns true if a full exit was received from the Obol API, and was written in exitFSPath.
// Each HTTP request has a 10 seconds timeout.
// Each HTTP request has a default timeout.
func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Client, lockHash []byte, validatorPubkey, exitFSPath string, shareIndex uint64, identityKey *k1.PrivateKey) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, obolAPITimeout)
defer cancel()

fullExit, err := oAPI.GetFullExit(ctx, validatorPubkey, lockHash, shareIndex, identityKey)
Expand Down Expand Up @@ -300,17 +326,16 @@ func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Cli
return true
}

func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// postPartialExit posts exitBlobs to Obol API with a default HTTP request timeout.
func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) error {
ctx, cancel := context.WithTimeout(ctx, obolAPITimeout)
defer cancel()

// we're retrying every second until we succeed
if err := oAPI.PostPartialExit(ctx, mutationHash, shareIndex, identityKey, exitBlobs...); err != nil {
log.Error(ctx, "Cannot post exits to obol api", err)
return false
return errors.Wrap(err, "cannot post partial exit")
}

return true
return nil
}

// shouldProcessValidator returns true if a validator needs to be processed, meaning a full exit message must
Expand Down
2 changes: 1 addition & 1 deletion app/app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Test_newSlotTicker(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := ldetestutil.APIServers(t, lock)
srvs := ldetestutil.APIServers(t, lock, false)
defer srvs.Close()

clock := clockwork.NewFakeClock()
Expand Down
85 changes: 81 additions & 4 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"

eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -43,7 +44,7 @@ func Test_NormalFlow(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock)
srvs := testutil.APIServers(t, lock, false)
defer srvs.Close()

run(t,
Expand All @@ -53,6 +54,35 @@ func Test_NormalFlow(t *testing.T) {
keyShares,
true,
srvs,
false,
)
}

func Test_WithNonActiveVals(t *testing.T) {
valAmt := 100
operatorAmt := 4

lock, enrs, keyShares := cluster.NewForT(
t,
valAmt,
operatorAmt,
operatorAmt,
0,
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock, true)
defer srvs.Close()

td := t.TempDir()
run(t,
td,
lock,
enrs,
keyShares,
true,
srvs,
true,
)
}

Expand All @@ -69,7 +99,7 @@ func Test_RunTwice(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock)
srvs := testutil.APIServers(t, lock, false)
defer srvs.Close()

root := t.TempDir()
Expand All @@ -81,6 +111,7 @@ func Test_RunTwice(t *testing.T) {
keyShares,
true,
srvs,
false,
)

// delete half exits from each ejector directory
Expand All @@ -104,6 +135,7 @@ func Test_RunTwice(t *testing.T) {
keyShares,
false,
srvs,
false,
)
}

Expand All @@ -115,6 +147,7 @@ func run(
keyShares [][]tbls.PrivateKey,
createDirFiles bool,
servers testutil.TestServers,
withNonActiveVals bool,
) {
t.Helper()

Expand Down Expand Up @@ -177,7 +210,8 @@ func run(

eg := errgroup.Group{}

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for opIdx := 0; opIdx < operatorAmt; opIdx++ {
opIdx := opIdx
Expand All @@ -190,7 +224,50 @@ func run(
})
}

require.NoError(t, eg.Wait())
egErrorChan := make(chan error)
halfExitsErrorChan := make(chan error)

go func() {
egErrorChan <- eg.Wait()
}()

if withNonActiveVals {
// when withNonActiveVals is true, it means that we'll only produce half of the
// full exits.
go func() {
stop := false

for !stop {
for opIdx := 0; opIdx < len(enrs); opIdx++ {
opID := fmt.Sprintf("op%d", opIdx)

ejectorDir := filepath.Join(ejectorDir, opID)
files, err := os.ReadDir(ejectorDir)
require.NoError(t, err)

if len(files) >= len(keyShares[opIdx])/2 {
cancel() // stop everything, test's alright
halfExitsErrorChan <- nil
stop = true
}
}
runtime.Gosched() // yield a little
}
}()
}

stop := false
for !stop {
select {
case err := <-egErrorChan:
require.NoError(t, err)
stop = true

case err := <-halfExitsErrorChan:
require.NoError(t, err)
return
}
}

mockEth2Cl := servers.Eth2Client(t, context.Background())

Expand Down
18 changes: 15 additions & 3 deletions app/util/testutil/api_servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (ts *TestServers) Eth2Client(t *testing.T, ctx context.Context) eth2wrap.Cl
}

// APIServers return an instance of TestServer with mocked Obol API and beacon node API from a given lock file.
func APIServers(t *testing.T, lock cluster.Lock) TestServers {
func APIServers(t *testing.T, lock cluster.Lock, withNonActiveVals bool) TestServers {
t.Helper()

oapiHandler, oapiAddLock := obolapi.MockServer()
Expand All @@ -63,11 +63,23 @@ func APIServers(t *testing.T, lock cluster.Lock) TestServers {

mockValidators := map[string]eth2v1.Validator{}

for _, val := range lock.Validators {
mightBeInactive := func(withNonActiveVals bool, idx int) eth2v1.ValidatorState {
if !withNonActiveVals {
return eth2v1.ValidatorStateActiveOngoing
}

if idx%2 == 0 {
return eth2v1.ValidatorStateActiveOngoing
}

return eth2v1.ValidatorStatePendingQueued // return a state which doesn't represent "validator is running"
}

for idx, val := range lock.Validators {
mockValidators[val.PublicKeyHex()] = eth2v1.Validator{
Index: eth2p0.ValidatorIndex(rand.Int63()), //nolint:gosec // testing function
Balance: 42,
Status: eth2v1.ValidatorStateActiveOngoing,
Status: mightBeInactive(withNonActiveVals, idx),
Validator: &eth2p0.Validator{
PublicKey: eth2p0.BLSPubKey(val.PubKey),
WithdrawalCredentials: testutil.RandomBytes32(),
Expand Down

0 comments on commit b149e30

Please sign in to comment.