Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: interact with API more often #44

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 80 additions & 55 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/jonboulle/clockwork"
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/forkjoin"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1"
"github.com/obolnetwork/charon/eth2util/signing"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/tbls"
Expand All @@ -46,6 +46,7 @@ type Config struct {

const (
maxBeaconNodeTimeout = 10 * time.Second
obolAPITimeout = 10 * time.Second
)

// Run runs the lido-dv-exit core logic.
Expand Down Expand Up @@ -109,9 +110,28 @@ func Run(ctx context.Context, config Config) error {
oAPI := obolapi.Client{ObolAPIUrl: config.ObolAPIURL}

var signedExits []obolapi.ExitBlob
fetchedSignedExits := map[string]struct{}{}

for slot := range slotTicker {
if len(valsKeys) == 0 {
// if we don't have any more keys to process, make sure we fetched *all* full exits before
// exiting this loop
if len(fetchedSignedExits) != len(signedExits) {
writeAllFullExits(
ctx,
bnClient,
oAPI,
cl,
signedExits,
fetchedSignedExits,
config.EjectorExitPath,
shareIdx,
identityKey,
)

continue
}

break // we finished signing everything we had to sign
}

Expand All @@ -120,6 +140,11 @@ func Run(ctx context.Context, config Config) error {
continue
}

log.Info(ctx, "Signing exit for available validators...", z.Int("available_validators", len(valsKeys)))

// signedExitsInRound holds the signed partial exits generated at this epoch's lifecycle round
var signedExitsInRound []obolapi.ExitBlob

phase0Vals, err := valsKeys.ValidatorsPhase0()
if err != nil {
return errors.Wrap(err, "validator keys to phase0")
Expand Down Expand Up @@ -168,75 +193,76 @@ func Run(ctx context.Context, config Config) error {
continue
}

log.Debug(ctx, "Signed exit")
signedExits = append(signedExits, obolapi.ExitBlob{
log.Info(ctx, "Signed partial exit")
gsora marked this conversation as resolved.
Show resolved Hide resolved
signedExitsInRound = append(signedExitsInRound, obolapi.ExitBlob{
PublicKey: validatorPubkStr,
SignedExitMessage: exit,
})

delete(valsKeys, keystore.ValidatorPubkey(validatorPubkStr))
}
}

tick := time.NewTicker(1 * time.Second)
if len(signedExitsInRound) != 0 {
// try posting the partial exits that have been produced at this stage
if err := postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExitsInRound...); err != nil {
log.Error(ctx, "Cannot post exits to obol api, will retry later", err)
} else {
for _, signedExit := range signedExitsInRound {
delete(valsKeys, keystore.ValidatorPubkey(signedExit.PublicKey))
}
}

// send signed exit to obol api
for range tick.C {
// we're retrying every second until we succeed
if postPartialExit(ctx, oAPI, cl.GetInitialMutationHash(), shareIdx, identityKey, signedExits...) {
tick.Stop()
break
signedExits = append(signedExits, signedExitsInRound...)
}
}

type fetchExitData struct {
lockHash []byte
validatorPubkey string
shareIndex uint64
identityKey *k1.PrivateKey
writeAllFullExits(
ctx,
bnClient,
oAPI,
cl,
signedExits,
fetchedSignedExits,
config.EjectorExitPath,
shareIdx,
identityKey,
)
}

fork, join, fjcancel := forkjoin.New(ctx, func(ctx context.Context, data fetchExitData) (struct{}, error) {
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
log.Info(ctx, "Successfully fetched exit messages!")

exitFSPath := filepath.Join(config.EjectorExitPath, fmt.Sprintf("validator-exit-%s.json", data.validatorPubkey))
return nil
}

for range tick.C {
if fetchFullExit(ctx, bnClient, oAPI, data.lockHash, data.validatorPubkey, exitFSPath, data.shareIndex, data.identityKey) {
break
}
// writeAllFullExits fetches and writes signedExits to disk.
func writeAllFullExits(
ctx context.Context,
eth2Cl eth2wrap.Client,
oAPI obolapi.Client,
cl *manifestpb.Cluster,
signedExits []obolapi.ExitBlob,
alreadySignedExits map[string]struct{},
ejectorExitPath string,
shareIndex uint64,
identityKey *k1.PrivateKey,
) {
for _, signedExit := range signedExits {
if _, ok := alreadySignedExits[signedExit.PublicKey]; ok {
continue // bypass already-fetched full exit
}

return struct{}{}, nil
}, forkjoin.WithWorkers(10), forkjoin.WithWaitOnCancel())
exitFSPath := filepath.Join(ejectorExitPath, fmt.Sprintf("validator-exit-%s.json", signedExit.PublicKey))

defer fjcancel()

for _, se := range signedExits {
fork(fetchExitData{
lockHash: cl.InitialMutationHash,
validatorPubkey: se.PublicKey,
shareIndex: shareIdx,
identityKey: identityKey,
})
}

_, err = join().Flatten()
if !fetchFullExit(ctx, eth2Cl, oAPI, cl.InitialMutationHash, signedExit.PublicKey, exitFSPath, shareIndex, identityKey) {
log.Debug(ctx, "Could not fetch full exit for validator", z.Str("validator", signedExit.PublicKey))
continue
}

if err != nil && !errors.Is(err, context.Canceled) {
return errors.Wrap(err, "fatal error while processing full exits from obol api, please get in contact with the development team as soon as possible, with a full log of the execution")
alreadySignedExits[signedExit.PublicKey] = struct{}{}
}

log.Info(ctx, "Successfully fetched exit messages!")

return nil
}

// fetchFullExit returns true if a full exit was received from the Obol API, and was written in exitFSPath.
// Each HTTP request has a 10 seconds timeout.
// Each HTTP request has a default timeout.
func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Client, lockHash []byte, validatorPubkey, exitFSPath string, shareIndex uint64, identityKey *k1.PrivateKey) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, obolAPITimeout)
defer cancel()

fullExit, err := oAPI.GetFullExit(ctx, validatorPubkey, lockHash, shareIndex, identityKey)
Expand Down Expand Up @@ -300,17 +326,16 @@ func fetchFullExit(ctx context.Context, eth2Cl eth2wrap.Client, oAPI obolapi.Cli
return true
}

func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// postPartialExit posts exitBlobs to Obol API with a default HTTP request timeout.
func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []byte, shareIndex uint64, identityKey *k1.PrivateKey, exitBlobs ...obolapi.ExitBlob) error {
ctx, cancel := context.WithTimeout(ctx, obolAPITimeout)
defer cancel()

// we're retrying every second until we succeed
if err := oAPI.PostPartialExit(ctx, mutationHash, shareIndex, identityKey, exitBlobs...); err != nil {
log.Error(ctx, "Cannot post exits to obol api", err)
return false
return errors.Wrap(err, "cannot post partial exit")
}

return true
return nil
}

// shouldProcessValidator returns true if a validator needs to be processed, meaning a full exit message must
Expand Down
2 changes: 1 addition & 1 deletion app/app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Test_newSlotTicker(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := ldetestutil.APIServers(t, lock)
srvs := ldetestutil.APIServers(t, lock, false)
defer srvs.Close()

clock := clockwork.NewFakeClock()
Expand Down
85 changes: 81 additions & 4 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"

eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -43,7 +44,7 @@ func Test_NormalFlow(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock)
srvs := testutil.APIServers(t, lock, false)
defer srvs.Close()

run(t,
Expand All @@ -53,6 +54,35 @@ func Test_NormalFlow(t *testing.T) {
keyShares,
true,
srvs,
false,
)
}

func Test_WithNonActiveVals(t *testing.T) {
valAmt := 100
operatorAmt := 4

lock, enrs, keyShares := cluster.NewForT(
t,
valAmt,
operatorAmt,
operatorAmt,
0,
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock, true)
defer srvs.Close()

td := t.TempDir()
run(t,
td,
lock,
enrs,
keyShares,
true,
srvs,
true,
)
}

Expand All @@ -69,7 +99,7 @@ func Test_RunTwice(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

srvs := testutil.APIServers(t, lock)
srvs := testutil.APIServers(t, lock, false)
defer srvs.Close()

root := t.TempDir()
Expand All @@ -81,6 +111,7 @@ func Test_RunTwice(t *testing.T) {
keyShares,
true,
srvs,
false,
)

// delete half exits from each ejector directory
Expand All @@ -104,6 +135,7 @@ func Test_RunTwice(t *testing.T) {
keyShares,
false,
srvs,
false,
)
}

Expand All @@ -115,6 +147,7 @@ func run(
keyShares [][]tbls.PrivateKey,
createDirFiles bool,
servers testutil.TestServers,
withNonActiveVals bool,
) {
t.Helper()

Expand Down Expand Up @@ -177,7 +210,8 @@ func run(

eg := errgroup.Group{}

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for opIdx := 0; opIdx < operatorAmt; opIdx++ {
opIdx := opIdx
Expand All @@ -190,7 +224,50 @@ func run(
})
}

require.NoError(t, eg.Wait())
egErrorChan := make(chan error)
halfExitsErrorChan := make(chan error)

go func() {
egErrorChan <- eg.Wait()
}()

if withNonActiveVals {
// when withNonActiveVals is true, it means that we'll only produce half of the
// full exits.
go func() {
stop := false

for !stop {
for opIdx := 0; opIdx < len(enrs); opIdx++ {
opID := fmt.Sprintf("op%d", opIdx)

ejectorDir := filepath.Join(ejectorDir, opID)
files, err := os.ReadDir(ejectorDir)
require.NoError(t, err)

if len(files) >= len(keyShares[opIdx])/2 {
cancel() // stop everything, test's alright
halfExitsErrorChan <- nil
stop = true
}
}
runtime.Gosched() // yield a little
}
}()
}

stop := false
for !stop {
select {
case err := <-egErrorChan:
require.NoError(t, err)
stop = true

case err := <-halfExitsErrorChan:
require.NoError(t, err)
return
}
}

mockEth2Cl := servers.Eth2Client(t, context.Background())

Expand Down
18 changes: 15 additions & 3 deletions app/util/testutil/api_servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (ts *TestServers) Eth2Client(t *testing.T, ctx context.Context) eth2wrap.Cl
}

// APIServers return an instance of TestServer with mocked Obol API and beacon node API from a given lock file.
func APIServers(t *testing.T, lock cluster.Lock) TestServers {
func APIServers(t *testing.T, lock cluster.Lock, withNonActiveVals bool) TestServers {
t.Helper()

oapiHandler, oapiAddLock := obolapi.MockServer()
Expand All @@ -63,11 +63,23 @@ func APIServers(t *testing.T, lock cluster.Lock) TestServers {

mockValidators := map[string]eth2v1.Validator{}

for _, val := range lock.Validators {
mightBeInactive := func(withNonActiveVals bool, idx int) eth2v1.ValidatorState {
if !withNonActiveVals {
return eth2v1.ValidatorStateActiveOngoing
}

if idx%2 == 0 {
return eth2v1.ValidatorStateActiveOngoing
}

return eth2v1.ValidatorStatePendingQueued // return a state which doesn't represent "validator is running"
}

for idx, val := range lock.Validators {
mockValidators[val.PublicKeyHex()] = eth2v1.Validator{
Index: eth2p0.ValidatorIndex(rand.Int63()), //nolint:gosec // testing function
Balance: 42,
Status: eth2v1.ValidatorStateActiveOngoing,
Status: mightBeInactive(withNonActiveVals, idx),
Validator: &eth2p0.Validator{
PublicKey: eth2p0.BLSPubKey(val.PubKey),
WithdrawalCredentials: testutil.RandomBytes32(),
Expand Down
Loading