Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimistic v2 slowpath #530

Open
wants to merge 9 commits into
base: mikeneuder-2023-09-08-01
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions services/api/optimistic_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"time"

"github.com/alicebob/miniredis/v2"
"github.com/attestantio/go-builder-client/api/capella"
v1 "github.com/attestantio/go-builder-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/bellatrix"
consensuscapella "github.com/attestantio/go-eth2-client/spec/capella"
Expand Down Expand Up @@ -640,3 +642,81 @@ func TestBuilderApiSubmitNewBlockOptimisticV2_full(t *testing.T) {
})
}
}

func TestBuilderApiOptimisticV2SlowPath_fail_ssz_decode_header(t *testing.T) {
pubkey, _ := generateKeyPair(t)
backend := startTestBackend(t, pubkey)
outBytes := make([]byte, 944) // 944 bytes is min required to try ssz decoding.
outBytes[0] = 0xaa

r := bytes.NewReader(outBytes)

backend.relay.optimisticV2SlowPath(r, v2SlowPathOpts{
payload: &common.BuilderSubmitBlockRequest{
Capella: &capella.SubmitBlockRequest{
Message: &v1.BidTrace{
BuilderPubkey: *pubkey,
},
},
},
})

// Check that demotion occurred.
mockDB, ok := backend.relay.db.(*database.MockDB)
require.True(t, ok)
require.Equal(t, mockDB.Demotions[pubkey.String()], true)
}

func TestBuilderApiOptimisticV2SlowPath(t *testing.T) {
pubkey, secretkey := generateKeyPair(t)

testReq := common.TestBuilderSubmitBlockRequestV2(secretkey, getTestBidTrace(*pubkey, 10))
testPayload := &common.BuilderSubmitBlockRequest{
Capella: &capella.SubmitBlockRequest{
Message: &v1.BidTrace{
BuilderPubkey: *pubkey,
},
ExecutionPayload: &consensuscapella.ExecutionPayload{},
},
}

testCases := []struct {
description string
simError error
expectDemotion bool
}{
{
description: "success",
},
{
description: "failure_empty_payload",
simError: errFake,
expectDemotion: true,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
backend := startTestBackend(t, pubkey)
backend.relay.blockSimRateLimiter = &MockBlockSimulationRateLimiter{
simulationError: tc.simError,
}

submissionBytes, err := testReq.MarshalSSZ()
require.NoError(t, err)

r := bytes.NewReader(submissionBytes)

opts := v2SlowPathOpts{
payload: testPayload,
entry: &blockBuilderCacheEntry{},
}
backend.relay.optimisticV2SlowPath(r, opts)

// Check demotion status.
mockDB, ok := backend.relay.db.(*database.MockDB)
require.True(t, ok)
require.Equal(t, mockDB.Demotions[pubkey.String()], tc.expectDemotion)
})
}
}
128 changes: 109 additions & 19 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2311,15 +2311,12 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req
remainderReader := bytes.NewReader(remainder)

slowPathOpts := v2SlowPathOpts{
header: &header,
payload: payload,
receivedAt: receivedAt,
eligibleAt: eligibleAt,
pf: pf,
isCancellationEnabled: isCancellationEnabled,
entry: builderEntry,
gasLimit: gasLimit,
pipeliner: tx,
payload: payload,
receivedAt: receivedAt,
eligibleAt: eligibleAt,
pf: pf,
entry: builderEntry,
gasLimit: gasLimit,
}

// Join the header bytes with the remaining bytes.
Expand All @@ -2333,21 +2330,114 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req
}

type v2SlowPathOpts struct {
header *common.SubmitBlockRequestV2Optimistic
payload *common.BuilderSubmitBlockRequest
receivedAt time.Time
eligibleAt time.Time
pf common.Profile
isCancellationEnabled bool
entry *blockBuilderCacheEntry
gasLimit uint64
pipeliner redis.Pipeliner
payload *common.BuilderSubmitBlockRequest
receivedAt time.Time
eligibleAt time.Time
pf common.Profile
entry *blockBuilderCacheEntry
gasLimit uint64
}

func (api *RelayAPI) optimisticV2SlowPath(r io.Reader, v2Opts v2SlowPathOpts) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit of docs would be nice, to outline what'll happen in this function

log := api.log.WithFields(logrus.Fields{"method": "optimisticV2SlowPath"})

// TODO(mikeneuder): slow path
payload := v2Opts.payload
msg, err := io.ReadAll(r)
if err != nil {
demotionErr := fmt.Errorf("%w: could not read full message", err)
api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr)
log.WithError(err).Warn("could not read full message")
return
}

// Unmarshall full request.
var req common.SubmitBlockRequestV2Optimistic
err = req.UnmarshalSSZ(msg)
if err != nil {
demotionErr := fmt.Errorf("%w: could not unmarshall full request", err)
api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr)
log.WithError(err).Warn("could not unmarshall full request")
return
}

// Fill in txns and withdrawals.
payload.Capella.ExecutionPayload.Transactions = req.Transactions
payload.Capella.ExecutionPayload.Withdrawals = req.Withdrawals

getPayloadResponse, err := common.BuildGetPayloadResponse(payload)
if err != nil {
demotionErr := fmt.Errorf("%w: could not construct getPayloadResponse", err)
api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr)
log.WithError(err).Warn("could not construct getPayloadResponse")
return
}

// Create the redis pipeline tx
tx := api.redis.NewTxPipeline()

// Save payload.
err = api.redis.SaveExecutionPayloadCapella(context.Background(), tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella)
if err != nil {
demotionErr := fmt.Errorf("%w: could not save execution payload", err)
api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr)
log.WithError(err).Warn("could not save execution payload")
return
}

currentTime := time.Now().UTC()
log.WithFields(logrus.Fields{
"timeStampExecutionPayloadSaved": currentTime.UnixMilli(),
"timeSinceReceivedAt": v2Opts.receivedAt.Sub(currentTime).Milliseconds(),
"timeSinceEligibleAt": v2Opts.eligibleAt.Sub(currentTime).Milliseconds(),
}).Info("v2 execution payload saved")

// Used to communicate simulation result to the deferred function.
simResultC := make(chan *blockSimResult, 1)

// Deferred saving of the builder submission to database (whenever this function ends)
defer func() {
savePayloadToDatabase := !api.ffDisablePayloadDBStorage
var simResult *blockSimResult
select {
case simResult = <-simResultC:
case <-time.After(10 * time.Second):
log.Warn("timed out waiting for simulation result")
simResult = &blockSimResult{false, false, nil, nil}
}

submissionEntry, err := api.db.SaveBuilderBlockSubmission(payload, simResult.requestErr, simResult.validationErr, v2Opts.receivedAt, v2Opts.eligibleAt, simResult.wasSimulated, savePayloadToDatabase, v2Opts.pf, simResult.optimisticSubmission)
if err != nil {
log.WithError(err).WithField("payload", payload).Error("saving builder block submission to database failed")
return
}

err = api.db.UpsertBlockBuilderEntryAfterSubmission(submissionEntry, simResult.validationErr != nil)
if err != nil {
log.WithError(err).Error("failed to upsert block-builder-entry")
}
}()

// Simulate the block submission and save to db
timeBeforeValidation := time.Now().UTC()

log = log.WithFields(logrus.Fields{
"timestampBeforeValidation": timeBeforeValidation.UTC().UnixMilli(),
})

// Construct simulation request.
opts := blockSimOptions{
isHighPrio: v2Opts.entry.status.IsHighPrio,
log: log,
builder: v2Opts.entry,
req: &common.BuilderBlockValidationRequest{
BuilderSubmitBlockRequest: *payload,
RegisteredGasLimit: v2Opts.gasLimit,
},
}
api.processOptimisticBlock(opts, simResultC)

nextTime := time.Now().UTC()
v2Opts.pf.Simulation = uint64(nextTime.Sub(v2Opts.eligibleAt).Microseconds())

// All done
log.Info("received v2 block from builder")
Expand Down