Skip to content

Commit

Permalink
skip superset aggregatation (#12019)
Browse files Browse the repository at this point in the history
  • Loading branch information
domiwei authored Sep 18, 2024
1 parent de30934 commit 72f749b
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 29 deletions.
6 changes: 1 addition & 5 deletions cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
func (p *aggregationPoolImpl) GetAggregatationByRoot(root common.Hash) *solid.Attestation {
p.aggregatesLock.RLock()
defer p.aggregatesLock.RUnlock()
att := p.aggregates[root]
if att == nil {
return nil
}
return att.Copy()
return p.aggregates[root]
}

func (p *aggregationPoolImpl) sweepStaleAtt(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
return err
}

if g.committeeSub.NeedToAggregate(obj.Attestation.AttestantionData().CommitteeIndex()) {
if g.committeeSub.NeedToAggregate(obj.Attestation) {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj)
}

Expand Down
6 changes: 5 additions & 1 deletion cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,17 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return fmt.Errorf("invalid finalized checkpoint %w", ErrIgnore)
}

if !s.committeeSubscribe.NeedToAggregate(att.Attestation) {
return ErrIgnore
}

aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{signature[:]},
SignRoots: [][]byte{signingRoot[:]},
Pks: [][]byte{pubKey[:]},
GossipData: att.GossipData,
F: func() {
err = s.committeeSubscribe.CheckAggregateAttestation(att.Attestation)
err = s.committeeSubscribe.AggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
}
Expand Down
3 changes: 2 additions & 1 deletion cl/phase1/network/services/attestation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() {
t.mockForkChoice.FinalizedCheckpointVal = solid.NewCheckpointFromParameters(
mockFinalizedCheckPoint.BlockRoot(),
mockFinalizedCheckPoint.Epoch())
t.committeeSubscibe.EXPECT().CheckAggregateAttestation(att).Return(nil).Times(1)
t.committeeSubscibe.EXPECT().NeedToAggregate(att).Return(true).Times(1)
t.committeeSubscibe.EXPECT().AggregateAttestation(att).Return(nil).Times(1)
},
args: args{
ctx: context.Background(),
Expand Down
23 changes: 19 additions & 4 deletions cl/validator/committee_subscription/committee_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/erigontech/erigon/cl/gossip"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/network/subnets"
"github.com/erigontech/erigon/cl/utils"
"github.com/erigontech/erigon/cl/utils/eth_clock"
)

Expand Down Expand Up @@ -135,7 +136,7 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context,
return nil
}

func (c *CommitteeSubscribeMgmt) CheckAggregateAttestation(att *solid.Attestation) error {
func (c *CommitteeSubscribeMgmt) AggregateAttestation(att *solid.Attestation) error {
committeeIndex := att.AttestantionData().CommitteeIndex()
c.validatorSubsMutex.RLock()
defer c.validatorSubsMutex.RUnlock()
Expand All @@ -148,11 +149,25 @@ func (c *CommitteeSubscribeMgmt) CheckAggregateAttestation(att *solid.Attestatio
return nil
}

func (c *CommitteeSubscribeMgmt) NeedToAggregate(committeeIndex uint64) bool {
func (c *CommitteeSubscribeMgmt) NeedToAggregate(att *solid.Attestation) bool {
var (
committeeIndex = att.AttestantionData().CommitteeIndex()
)

c.validatorSubsMutex.RLock()
defer c.validatorSubsMutex.RUnlock()
if sub, ok := c.validatorSubs[committeeIndex]; ok {
return sub.aggregate
if sub, ok := c.validatorSubs[committeeIndex]; ok && sub.aggregate {
root, err := att.AttestantionData().HashSSZ()
if err != nil {
log.Warn("failed to hash attestation data", "err", err)
return false
}
aggregation := c.aggregationPool.GetAggregatationByRoot(root)
if aggregation == nil ||
!utils.IsNonStrictSupersetBitlist(aggregation.AggregationBits(), att.AggregationBits()) {
// the on bit is not set. need to aggregate
return true
}
}
return false
}
Expand Down
4 changes: 2 additions & 2 deletions cl/validator/committee_subscription/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ import (
//go:generate mockgen -typed=true -destination=./mock_services/committee_subscribe_mock.go -package=mock_services . CommitteeSubscribe
type CommitteeSubscribe interface {
AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error
CheckAggregateAttestation(att *solid.Attestation) error
NeedToAggregate(committeeIndex uint64) bool
AggregateAttestation(att *solid.Attestation) error
NeedToAggregate(att *solid.Attestation) bool
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 72f749b

Please sign in to comment.