Skip to content

Commit

Permalink
fix loop (#115)
Browse files Browse the repository at this point in the history
* fix loop
  • Loading branch information
ToniRamirezM authored Aug 14, 2024
1 parent 9c639f1 commit 718d07f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 52 deletions.
129 changes: 79 additions & 50 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aggregator

import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/json"
Expand Down Expand Up @@ -46,6 +45,10 @@ const (
mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e"
)

var (
busyError = errors.New("witness server is busy")
)

type finalProofMsg struct {
proverName string
proverID string
Expand Down Expand Up @@ -81,7 +84,7 @@ type Aggregator struct {
finalProof chan finalProofMsg
verifyingProof bool

witnessRetrievalChan chan *state.DBBatch
witnessRetrievalChan chan state.DBBatch

srv *grpc.Server
ctx context.Context
Expand Down Expand Up @@ -179,48 +182,47 @@ func New(
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan *state.DBBatch, cfg.MaxWitnessRetrievalWorkers),
witnessRetrievalChan: make(chan state.DBBatch),
}

log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers)

// Set function to handle the batches from the data stream
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)

return a, nil
}

func (a *Aggregator) retrieveWitnesses() {
select {
case <-a.ctx.Done():
return
case dbBatch := <-a.witnessRetrievalChan:
go a.retrieveWitness(dbBatch)
}
}

func (a *Aggregator) retrieveWitness(dbBatch *state.DBBatch) {
var (
success bool
err error
)

for !success {
// Get Witness
dbBatch.Witness, err = getWitness(a.currentStreamBatch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
log.Errorf("Failed to get witness for batch %d, err: %v", a.currentStreamBatch.BatchNumber, err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
func (a *Aggregator) retrieveWitness() {
var success bool
for {
dbBatch := <-a.witnessRetrievalChan
inner:
for !success {
var err error
// Get Witness
dbBatch.Witness, err = getWitness(dbBatch.Batch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
if err == busyError {
log.Debugf("Witness server is busy, retrying in %v", a.cfg.RetryTime.Duration)
} else {
log.Errorf("Failed to get witness for batch %d, err: %v", dbBatch.Batch.BatchNumber, err)
}
time.Sleep(a.cfg.RetryTime.Duration)
continue inner
}

err = a.state.AddBatch(a.ctx, dbBatch, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
err = a.state.AddBatch(a.ctx, &dbBatch, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue inner
}
success = true
}

success = true
success = false
}
}

Expand Down Expand Up @@ -384,14 +386,30 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
Witness: nil,
}

// Store batch in the DB and retrieve witness
// Check if the batch is already in the DB to keep its witness
wDBBatch, err := a.state.GetBatch(a.ctx, a.currentStreamBatch.BatchNumber, nil)
if err != nil {
if !errors.Is(err, state.ErrNotFound) {
log.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber, err)
return err
}
}

if wDBBatch != nil && wDBBatch.Witness != nil && len(wDBBatch.Witness) > 0 {
dbBatch.Witness = wDBBatch.Witness
}

// Store batch in the DB
err = a.state.AddBatch(a.ctx, &dbBatch, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
return err
}

a.witnessRetrievalChan <- &dbBatch
// Retrieve the witness
if dbBatch.Witness == nil || len(dbBatch.Witness) == 0 {
a.witnessRetrievalChan <- dbBatch
}
}

// Reset current batch data
Expand Down Expand Up @@ -531,7 +549,9 @@ func (a *Aggregator) Start(ctx context.Context) error {
}()

// Witness retrieval workers
go a.retrieveWitnesses()
for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ {
go a.retrieveWitness()
}

// Start stream client
err = a.streamClient.Start()
Expand Down Expand Up @@ -790,16 +810,16 @@ func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Cont
}

// buildFinalProof builds and return the final proof for an aggregated/batch proof.
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*prover.FinalProof, error) {
func (a *Aggregator) buildFinalProof(ctx context.Context, proverI proverInterface, proof *state.Proof) (*prover.FinalProof, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
"prover", proverI.Name(),
"proverId", proverI.ID(),
"proverAddr", proverI.Addr(),
"recursiveProofId", *proof.ProofID,
"batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal),
)

finalProofID, err := prover.FinalProof(proof.Proof, a.cfg.SenderAddress)
finalProofID, err := proverI.FinalProof(proof.Proof, a.cfg.SenderAddress)
if err != nil {
return nil, fmt.Errorf("failed to get final proof id: %w", err)
}
Expand All @@ -808,7 +828,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
log.Infof("Final proof ID for batches [%d-%d]: %s", proof.BatchNumber, proof.BatchNumberFinal, *proof.ProofID)
log = log.WithFields("finalProofId", finalProofID)

finalProof, err := prover.WaitFinalProof(ctx, *proof.ProofID)
finalProof, err := proverI.WaitFinalProof(ctx, *proof.ProofID)
if err != nil {
return nil, fmt.Errorf("failed to get final proof from prover: %w", err)
}
Expand All @@ -828,15 +848,19 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
}

// Sanity Check: state root from the proof must match the one from the final batch
finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}
if a.cfg.FinalProofSanityCheckEnabled {
finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}

if !bytes.Equal(finalProof.Public.NewStateRoot, finalDBBatch.Batch.StateRoot.Bytes()) {
for {
log.Errorf("State root from the final proof [%#x] does not match the one from the batch [%#x]. HALTED", finalProof.Public.NewStateRoot, finalDBBatch.Batch.StateRoot.Bytes())
time.Sleep(a.cfg.RetryTime.Duration)
if common.BytesToHash(finalProof.Public.NewStateRoot).String() != finalDBBatch.Batch.StateRoot.String() {
for {
log.Errorf("State root from the final proof does not match the expected for batch %d: Proof = [%s] Expected = [%s]", proof.BatchNumberFinal, string(finalProof.Public.NewStateRoot), finalDBBatch.Batch.StateRoot.String())
time.Sleep(a.cfg.RetryTime.Duration)
}
} else {
log.Infof("State root sanity check from the final proof for batch %d passed", proof.BatchNumberFinal)
}
}

Expand Down Expand Up @@ -1290,7 +1314,7 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
}

// Check if the witness is already in the DB
if dbBatch.Witness == nil {
if dbBatch.Witness == nil || len(dbBatch.Witness) == 0 {
log.Infof("Witness for batch %d is not yet in DB", batchNumberToVerify)
return nil, nil, nil, state.ErrNotFound
}
Expand Down Expand Up @@ -1603,6 +1627,8 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error
witnessType = "full"
}

log.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType)

start := time.Now()
response, err = rpc.JSONRPCCall(URL, "zkevm_getBatchWitness", batchNumber, witnessType)
if err != nil {
Expand All @@ -1611,6 +1637,9 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error

// Check if the response is an error
if response.Error != nil {
if response.Error.Message == "busy" {
return nil, busyError
}
return nil, fmt.Errorf("error from witness for batch %d: %v", batchNumber, response.Error)
}

Expand Down
3 changes: 3 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Config struct {
// BatchProofSanityCheckEnabled is a flag to enable the sanity check of the batch proof
BatchProofSanityCheckEnabled bool `mapstructure:"BatchProofSanityCheckEnabled"`

// FinalProofSanityCheckEnabled is a flag to enable the sanity check of the final proof
FinalProofSanityCheckEnabled bool `mapstructure:"FinalProofSanityCheckEnabled"`

// ChainID is the L2 ChainID provided by the Network Config
ChainID uint64

Expand Down
8 changes: 6 additions & 2 deletions aggregator/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,12 @@ func (p *Prover) WaitRecursiveProof(ctx context.Context, proofID string) (string
}

sr, err := GetStateRootFromProof(res.Proof.(*GetProofResponse_RecursiveProof).RecursiveProof)
if err != nil {
log.Info("Recursive proof does not contain state root")
if err != nil && sr != (common.Hash{}) {
log.Errorf("Error getting state root from proof: %v", err)
}

if sr == (common.Hash{}) {
log.Info("Recursive proof does not contain state root. Possibly mock prover is in use.")
}

resProof := res.Proof.(*GetProofResponse_RecursiveProof)
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SenderAddress = ""
CleanupLockedProofsInterval = "2m"
GeneratingProofCleanupThreshold = "10m"
BatchProofSanityCheckEnabled = true
FinalProofSanityCheckEnabled = true
ForkId = 9
GasOffset = 0
WitnessURL = "localhost:8123"
Expand Down

0 comments on commit 718d07f

Please sign in to comment.