diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 6d3e96ab..cefe1a56 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -42,6 +42,7 @@ const ( dataStreamType = 1 mockedStateRoot = "0x090bcaf734c4f06c93954a827b45a6e8c67b8e0fd1e0a35a1c5982d6961828f9" mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e" + maxDBBigIntValue = 9223372036854775807 ) var ( @@ -171,6 +172,7 @@ func New( } a := &Aggregator{ + ctx: ctx, cfg: cfg, state: stateInterface, etherman: etherman, @@ -188,10 +190,15 @@ func New( witnessRetrievalChan: make(chan state.DBBatch), } + if a.ctx == nil { + a.ctx, a.exit = context.WithCancel(a.ctx) + } + // Set function to handle the batches from the data stream if !cfg.SyncModeOnlyEnabled { a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream) a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg) + a.l1Syncr.SetCallbackOnRollbackBatches(a.handleRollbackBatches) } return a, nil @@ -232,14 +239,12 @@ func (a *Aggregator) retrieveWitness() { func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { log.Warnf("Reorg detected, reorgData: %+v", reorgData) - ctx := context.Background() - // Get new latest verified batch number - lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(ctx) + lastVBatchNumber, err := a.l1Syncr.GetLastestVirtualBatchNumber(a.ctx) if err != nil { log.Errorf("Error getting last virtual batch number: %v", err) } else { - err = a.state.DeleteBatchesNewerThanBatchNumber(ctx, lastVBatchNumber, nil) + err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, lastVBatchNumber, nil) if err != nil { log.Errorf("Error deleting batches newer than batch number %d: %v", lastVBatchNumber, err) } @@ -248,11 +253,106 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { // Halt the aggregator a.halted.Store(true) for { - log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been delete so it is safe to manually restart the aggregator.") + log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.") time.Sleep(10 * time.Second) // nolint:gomnd } } +func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) { + log.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData) + + // Stop Reading the data stream + err := a.streamClient.ExecCommandStop() + if err != nil { + log.Errorf("failed to stop data stream: %v", err) + } else { + log.Info("Data stream client stopped") + } + + // Get new last verified batch number from L1 + var lastVerifiedBatchNumber uint64 + if err == nil { + lastVerifiedBatchNumber, err = a.etherman.GetLatestVerifiedBatchNum() + if err != nil { + log.Errorf("Error getting latest verified batch number: %v", err) + } + } + + // Check lastVerifiedBatchNumber makes sense + if err == nil && lastVerifiedBatchNumber > rollbackData.LastBatchNumber { + err = fmt.Errorf("last verified batch number %d is greater than the last batch number %d in the rollback data", lastVerifiedBatchNumber, rollbackData.LastBatchNumber) + } + + // Delete invalidated batches + if err == nil { + err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil) + if err != nil { + log.Errorf("Error deleting batches newer than batch number %d: %v", rollbackData.LastBatchNumber, err) + } else { + log.Infof("Deleted batches newer than batch number %d", rollbackData.LastBatchNumber) + } + } + + // Older batches data can also be deleted + if err == nil { + err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil) + if err != nil { + log.Errorf("Error deleting batches older than batch number %d: %v", rollbackData.LastBatchNumber, err) + } else { + log.Infof("Deleted batches older than batch number %d", rollbackData.LastBatchNumber) + } + } + + // Delete wip proofs + if err == nil { + err = a.state.DeleteUngeneratedProofs(a.ctx, nil) + if err != nil { + log.Errorf("Error deleting ungenerated proofs: %v", err) + } else { + log.Info("Deleted ungenerated proofs") + } + } + + // Delete any proof for the batches that have been rolled back + if err == nil { + err = a.state.DeleteGeneratedProofs(a.ctx, rollbackData.LastBatchNumber+1, maxDBBigIntValue, nil) + if err != nil { + log.Errorf("Error deleting generated proofs: %v", err) + } else { + log.Infof("Deleted generated proofs for batches newer than %d", rollbackData.LastBatchNumber) + } + } + + if err == nil { + var marshalledBookMark []byte + // Resume reading the data stream + bookMark := &datastream.BookMark{ + Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH, + Value: rollbackData.LastBatchNumber + 1, + } + + marshalledBookMark, err = proto.Marshal(bookMark) + if err != nil { + log.Error("failed to marshal bookmark: %v", err) + } else { + err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) + if err != nil { + log.Errorf("failed to connect to data stream: %v", err) + } + log.Info("Data stream client resumed") + } + } + + if err == nil { + log.Info("Handling rollback batches event finished successfully") + } else { + for { + log.Errorf("Error handling rollback batches event: %v", err) + time.Sleep(a.cfg.RetryTime.Duration) + } + } +} + func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer) error { forcedBlockhashL1 := common.Hash{} @@ -479,15 +579,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli } // Start starts the aggregator -func (a *Aggregator) Start(ctx context.Context) error { - var cancel context.CancelFunc - if ctx == nil { - ctx = context.Background() - } - ctx, cancel = context.WithCancel(ctx) - a.ctx = ctx - a.exit = cancel - +func (a *Aggregator) Start() error { // Initial L1 Sync blocking err := a.l1Syncr.Sync(true) if err != nil { @@ -523,18 +615,18 @@ func (a *Aggregator) Start(ctx context.Context) error { } // Cleanup data base - err = a.state.DeleteBatchesOlderThanBatchNumber(ctx, lastVerifiedBatchNumber, nil) + err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, lastVerifiedBatchNumber, nil) if err != nil { return err } // Delete ungenerated recursive proofs - err = a.state.DeleteUngeneratedProofs(ctx, nil) + err = a.state.DeleteUngeneratedProofs(a.ctx, nil) if err != nil { return fmt.Errorf("failed to initialize proofs cache %w", err) } - accInputHash, err := a.getVerifiedBatchAccInputHash(ctx, lastVerifiedBatchNumber) + accInputHash, err := a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber) if err != nil { return err } @@ -544,7 +636,7 @@ func (a *Aggregator) Start(ctx context.Context) error { // Store Acc Input Hash of the latest verified batch dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}} - err = a.state.AddBatch(ctx, &dummyDBBatch, nil) + err = a.state.AddBatch(a.ctx, &dummyDBBatch, nil) if err != nil { return err } @@ -591,8 +683,8 @@ func (a *Aggregator) Start(ctx context.Context) error { }() } - <-ctx.Done() - return ctx.Err() + <-a.ctx.Done() + return a.ctx.Err() } // Stop stops the Aggregator server. diff --git a/cmd/run.go b/cmd/run.go index 3913cb8b..b960b376 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -83,7 +83,7 @@ func start(cliCtx *cli.Context) error { aggregator := createAggregator(cliCtx.Context, *c, !cliCtx.Bool(config.FlagMigrations)) // start aggregator in a goroutine, checking for errors go func() { - if err := aggregator.Start(cliCtx.Context); err != nil { + if err := aggregator.Start(); err != nil { log.Fatal(err) } }() diff --git a/go.mod b/go.mod index 2cd03b57..eb37abe8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 - github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6 + github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 github.com/ethereum/go-ethereum v1.14.5 github.com/hermeznetwork/tracerr v0.3.2 github.com/iden3/go-iden3-crypto v0.0.16 diff --git a/go.sum b/go.sum index 0e2126db..cb93abd1 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 h1:zJ06KCGLMDOap4slop/QmiM github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3/go.mod h1:bv7DjATsczN2WvFt26jv34TWv6rfvYM1SqegrgrFwfI= github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 h1:QElCysO7f2xaknY/RDjxcs7IVmcgORfsCX2g+YD0Ko4= github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234/go.mod h1:zBZWxwOHKlw+ghd9roQLgIkDZWA7e7qO3EsfQQT/+oQ= -github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6 h1:BlX6ucNiBVkulBUGBhDHFpK7/D9TrDprc9nxxmQ0Qp4= -github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.6/go.mod h1:xUTgenGeFS7rAoTVMrlkr94b72gXXdzYcqDyBuY1Kwc= +github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 h1:h/B5AzWSZTxb1HouulXeE9nbHD1d4/nc67ZQc0khAQA= +github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0/go.mod h1:+tQwkDf+5AL3dgL6G1t0qmwct0NJDlGlzqycOM5jn5g= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=