Skip to content

Commit

Permalink
feat: handle rollback batches (#52)
Browse files Browse the repository at this point in the history
* feat: handle rollback batches
  • Loading branch information
ToniRamirezM authored Aug 28, 2024
1 parent 7c419d5 commit eaafcc1
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 24 deletions.
132 changes: 112 additions & 20 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
dataStreamType = 1
mockedStateRoot = "0x090bcaf734c4f06c93954a827b45a6e8c67b8e0fd1e0a35a1c5982d6961828f9"
mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e"
maxDBBigIntValue = 9223372036854775807
)

var (
Expand Down Expand Up @@ -171,6 +172,7 @@ func New(
}

a := &Aggregator{
ctx: ctx,
cfg: cfg,
state: stateInterface,
etherman: etherman,
Expand All @@ -188,10 +190,15 @@ func New(
witnessRetrievalChan: make(chan state.DBBatch),
}

if a.ctx == nil {
a.ctx, a.exit = context.WithCancel(a.ctx)
}

// Set function to handle the batches from the data stream
if !cfg.SyncModeOnlyEnabled {
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg)
a.l1Syncr.SetCallbackOnRollbackBatches(a.handleRollbackBatches)
}

return a, nil
Expand Down Expand Up @@ -232,14 +239,12 @@ func (a *Aggregator) retrieveWitness() {
func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
log.Warnf("Reorg detected, reorgData: %+v", reorgData)

ctx := context.Background()

// Get new latest verified batch number
lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(ctx)
lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(a.ctx)
if err != nil {
log.Errorf("Error getting last virtual batch number: %v", err)
} else {
err = a.state.DeleteBatchesNewerThanBatchNumber(ctx, lastVBatchNumber, nil)
err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, lastVBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches newer than batch number %d: %v", lastVBatchNumber, err)
}
Expand All @@ -248,11 +253,106 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
// Halt the aggregator
a.halted.Store(true)
for {
log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been delete so it is safe to manually restart the aggregator.")
log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.")
time.Sleep(10 * time.Second) // nolint:gomnd
}
}

func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) {
log.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData)

// Stop Reading the data stream
err := a.streamClient.ExecCommandStop()
if err != nil {
log.Errorf("failed to stop data stream: %v", err)
} else {
log.Info("Data stream client stopped")
}

// Get new last verified batch number from L1
var lastVerifiedBatchNumber uint64
if err == nil {
lastVerifiedBatchNumber, err = a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
log.Errorf("Error getting latest verified batch number: %v", err)
}
}

// Check lastVerifiedBatchNumber makes sense
if err == nil && lastVerifiedBatchNumber > rollbackData.LastBatchNumber {
err = fmt.Errorf("last verified batch number %d is greater than the last batch number %d in the rollback data", lastVerifiedBatchNumber, rollbackData.LastBatchNumber)
}

// Delete invalidated batches
if err == nil {
err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches newer than batch number %d: %v", rollbackData.LastBatchNumber, err)
} else {
log.Infof("Deleted batches newer than batch number %d", rollbackData.LastBatchNumber)
}
}

// Older batches data can also be deleted
if err == nil {
err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil)
if err != nil {
log.Errorf("Error deleting batches older than batch number %d: %v", rollbackData.LastBatchNumber, err)
} else {
log.Infof("Deleted batches older than batch number %d", rollbackData.LastBatchNumber)
}
}

// Delete wip proofs
if err == nil {
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
if err != nil {
log.Errorf("Error deleting ungenerated proofs: %v", err)
} else {
log.Info("Deleted ungenerated proofs")
}
}

// Delete any proof for the batches that have been rolled back
if err == nil {
err = a.state.DeleteGeneratedProofs(a.ctx, rollbackData.LastBatchNumber+1, maxDBBigIntValue, nil)
if err != nil {
log.Errorf("Error deleting generated proofs: %v", err)
} else {
log.Infof("Deleted generated proofs for batches newer than %d", rollbackData.LastBatchNumber)
}
}

if err == nil {
var marshalledBookMark []byte
// Resume reading the data stream
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: rollbackData.LastBatchNumber + 1,
}

marshalledBookMark, err = proto.Marshal(bookMark)
if err != nil {
log.Error("failed to marshal bookmark: %v", err)
} else {
err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Errorf("failed to connect to data stream: %v", err)
}
log.Info("Data stream client resumed")
}
}

if err == nil {
log.Info("Handling rollback batches event finished successfully")
} else {
for {
log.Errorf("Error handling rollback batches event: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
}
}
}

func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer) error {
forcedBlockhashL1 := common.Hash{}

Expand Down Expand Up @@ -479,15 +579,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
}

// Start starts the aggregator
func (a *Aggregator) Start(ctx context.Context) error {
var cancel context.CancelFunc
if ctx == nil {
ctx = context.Background()
}
ctx, cancel = context.WithCancel(ctx)
a.ctx = ctx
a.exit = cancel

func (a *Aggregator) Start() error {
// Initial L1 Sync blocking
err := a.l1Syncr.Sync(true)
if err != nil {
Expand Down Expand Up @@ -523,18 +615,18 @@ func (a *Aggregator) Start(ctx context.Context) error {
}

// Cleanup data base
err = a.state.DeleteBatchesOlderThanBatchNumber(ctx, lastVerifiedBatchNumber, nil)
err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, lastVerifiedBatchNumber, nil)
if err != nil {
return err
}

// Delete ungenerated recursive proofs
err = a.state.DeleteUngeneratedProofs(ctx, nil)
err = a.state.DeleteUngeneratedProofs(a.ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}

accInputHash, err := a.getVerifiedBatchAccInputHash(ctx, lastVerifiedBatchNumber)
accInputHash, err := a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber)
if err != nil {
return err
}
Expand All @@ -544,7 +636,7 @@ func (a *Aggregator) Start(ctx context.Context) error {

// Store Acc Input Hash of the latest verified batch
dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}}
err = a.state.AddBatch(ctx, &dummyDBBatch, nil)
err = a.state.AddBatch(a.ctx, &dummyDBBatch, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -591,8 +683,8 @@ func (a *Aggregator) Start(ctx context.Context) error {
}()
}

<-ctx.Done()
return ctx.Err()
<-a.ctx.Done()
return a.ctx.Err()
}

// Stop stops the Aggregator server.
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func start(cliCtx *cli.Context) error {
aggregator := createAggregator(cliCtx.Context, *c, !cliCtx.Bool(config.FlagMigrations))
// start aggregator in a goroutine, checking for errors
go func() {
if err := aggregator.Start(cliCtx.Context); err != nil {
if err := aggregator.Start(); err != nil {
log.Fatal(err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0
github.com/ethereum/go-ethereum v1.14.5
github.com/hermeznetwork/tracerr v0.3.2
github.com/iden3/go-iden3-crypto v0.0.16
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 h1:zJ06KCGLMDOap4slop/QmiM
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3/go.mod h1:bv7DjATsczN2WvFt26jv34TWv6rfvYM1SqegrgrFwfI=
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 h1:QElCysO7f2xaknY/RDjxcs7IVmcgORfsCX2g+YD0Ko4=
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234/go.mod h1:zBZWxwOHKlw+ghd9roQLgIkDZWA7e7qO3EsfQQT/+oQ=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6 h1:BlX6ucNiBVkulBUGBhDHFpK7/D9TrDprc9nxxmQ0Qp4=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6/go.mod h1:xUTgenGeFS7rAoTVMrlkr94b72gXXdzYcqDyBuY1Kwc=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 h1:h/B5AzWSZTxb1HouulXeE9nbHD1d4/nc67ZQc0khAQA=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0/go.mod h1:+tQwkDf+5AL3dgL6G1t0qmwct0NJDlGlzqycOM5jn5g=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
Expand Down

0 comments on commit eaafcc1

Please sign in to comment.