Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Aug 13, 2024
1 parent b4fa3f1 commit f762b8f
Showing 1 changed file with 38 additions and 62 deletions.
100 changes: 38 additions & 62 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ type Aggregator struct {
finalProof chan finalProofMsg
verifyingProof bool

activeWitnessRetrievalWorkers int
witnessRetrievalChan chan state.DBBatch
activeWitnessRetrievalWorkersMutex sync.Mutex
witnessRetrievalChan chan state.DBBatch

srv *grpc.Server
ctx context.Context
Expand Down Expand Up @@ -167,23 +165,21 @@ func New(
}

a := &Aggregator{
cfg: cfg,
state: stateInterface,
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan state.DBBatch),
activeWitnessRetrievalWorkers: 0,
activeWitnessRetrievalWorkersMutex: sync.Mutex{},
cfg: cfg,
state: stateInterface,
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan state.DBBatch),
}

log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers)
Expand All @@ -195,54 +191,32 @@ func New(
return a, nil
}

func (a *Aggregator) retrieveWitnesses() {
currentWorkers := 0
func (a *Aggregator) retrieveWitness() {
var success bool
for {
dbBatch := <-a.witnessRetrievalChan
a.activeWitnessRetrievalWorkersMutex.Lock()
currentWorkers = a.activeWitnessRetrievalWorkers
a.activeWitnessRetrievalWorkersMutex.Unlock()

for currentWorkers >= a.cfg.MaxWitnessRetrievalWorkers {
time.Sleep(a.cfg.RetryTime.Duration)
a.activeWitnessRetrievalWorkersMutex.Lock()
currentWorkers = a.activeWitnessRetrievalWorkers
a.activeWitnessRetrievalWorkersMutex.Unlock()
}
a.activeWitnessRetrievalWorkersMutex.Lock()
a.activeWitnessRetrievalWorkers++
a.activeWitnessRetrievalWorkersMutex.Unlock()
go a.retrieveWitness(dbBatch)
}
}

func (a *Aggregator) retrieveWitness(dbBatch state.DBBatch) {
var success bool

for !success {
// Get Witness
witness, err := getWitness(dbBatch.Batch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
log.Errorf("Failed to get witness for batch %d, err: %v", dbBatch.Batch.BatchNumber, err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
for !success {
// Get Witness
witness, err := getWitness(dbBatch.Batch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
log.Errorf("Failed to get witness for batch %d, err: %v", dbBatch.Batch.BatchNumber, err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}

dbBatch.Witness = witness
dbBatch.Witness = witness

err = a.state.AddBatch(a.ctx, &dbBatch, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
err = a.state.AddBatch(a.ctx, &dbBatch, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
success = true
}

success = true
success = false
}

a.activeWitnessRetrievalWorkersMutex.Lock()
a.activeWitnessRetrievalWorkers--
a.activeWitnessRetrievalWorkersMutex.Unlock()
}

func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
Expand Down Expand Up @@ -568,7 +542,9 @@ func (a *Aggregator) Start(ctx context.Context) error {
}()

// Witness retrieval workers
go a.retrieveWitnesses()
for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ {
go a.retrieveWitness()
}

// Start stream client
err = a.streamClient.Start()
Expand Down

0 comments on commit f762b8f

Please sign in to comment.