Skip to content

Commit

Permalink
feat: epoch notifier (#144)
Browse files Browse the repository at this point in the history
- Send certificates after a percentage of epoch
- Require epoch configuration to AggLayer
- Change config of `aggsender` adding: `BlockFinality` and `EpochNotificationPercentage`
  • Loading branch information
joanestebanr authored Nov 12, 2024
1 parent 7545d8f commit dbc47e1
Show file tree
Hide file tree
Showing 37 changed files with 3,041 additions and 1,399 deletions.
30 changes: 29 additions & 1 deletion agglayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ import (

const errCodeAgglayerRateLimitExceeded int = -10007

var ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
var (
ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
jSONRPCCall = rpc.JSONRPCCall
)

type AggLayerClientGetEpochConfiguration interface {
GetEpochConfiguration() (*ClockConfiguration, error)
}

// AgglayerClientInterface is the interface that defines the methods that the AggLayerClient will implement
type AgglayerClientInterface interface {
SendTx(signedTx SignedTx) (common.Hash, error)
WaitTxToBeMined(hash common.Hash, ctx context.Context) error
SendCertificate(certificate *SignedCertificate) (common.Hash, error)
GetCertificateHeader(certificateHash common.Hash) (*CertificateHeader, error)
AggLayerClientGetEpochConfiguration
}

// AggLayerClient is the client that will be used to interact with the AggLayer
Expand Down Expand Up @@ -130,3 +138,23 @@ func (c *AggLayerClient) GetCertificateHeader(certificateHash common.Hash) (*Cer

return result, nil
}

// GetEpochConfiguration returns the clock configuration of AggLayer
func (c *AggLayerClient) GetEpochConfiguration() (*ClockConfiguration, error) {
response, err := jSONRPCCall(c.url, "interop_getEpochConfiguration")
if err != nil {
return nil, err
}

if response.Error != nil {
return nil, fmt.Errorf("GetEpochConfiguration code=%d msg=%s", response.Error.Code, response.Error.Message)
}

var result *ClockConfiguration
err = json.Unmarshal(response.Result, &result)
if err != nil {
return nil, err
}

return result, nil
}
76 changes: 76 additions & 0 deletions agglayer/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package agglayer

import (
"fmt"
"testing"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/stretchr/testify/require"
)

const (
testURL = "http://localhost:8080"
)

func TestExploratoryClient(t *testing.T) {
t.Skip("This test is for exploratory purposes only")
sut := NewAggLayerClient("http://127.0.0.1:32853")
config, err := sut.GetEpochConfiguration()
require.NoError(t, err)
require.NotNil(t, config)
fmt.Printf("Config: %s", config.String())
}

func TestGetEpochConfigurationResponseWithError(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Error: &rpc.ErrorObject{},
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationResponseBadJson(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationErrorResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)

jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return rpc.Response{}, fmt.Errorf("unittest error")
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationOkResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{"epoch_duration": 1, "genesis_block": 1}`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.NotNil(t, clockConfig)
require.NoError(t, err)
require.Equal(t, ClockConfiguration{
EpochDuration: 1,
GenesisBlock: 1,
}, *clockConfig)
}
32 changes: 31 additions & 1 deletion agglayer/mock_agglayer_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions agglayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,14 @@ func (c *CertificateHeader) UnmarshalJSON(data []byte) error {

return nil
}

// ClockConfiguration represents the configuration of the epoch clock
// returned by the interop_GetEpochConfiguration RPC call
type ClockConfiguration struct {
EpochDuration uint64 `json:"epoch_duration"`
GenesisBlock uint64 `json:"genesis_block"`
}

func (c ClockConfiguration) String() string {
return fmt.Sprintf("EpochDuration: %d, GenesisBlock: %d", c.EpochDuration, c.GenesisBlock)
}
79 changes: 43 additions & 36 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"fmt"
"math/big"
"os"
"slices"
"time"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db"
aggsendertypes "github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/bridgesync"
cdkcommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/l1infotreesync"
Expand All @@ -33,10 +34,11 @@ var (

// AggSender is a component that will send certificates to the aggLayer
type AggSender struct {
log aggsendertypes.Logger
log types.Logger

l2Syncer aggsendertypes.L2BridgeSyncer
l1infoTreeSyncer aggsendertypes.L1InfoTreeSyncer
l2Syncer types.L2BridgeSyncer
l1infoTreeSyncer types.L1InfoTreeSyncer
epochNotifier types.EpochNotifier

storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface
Expand All @@ -53,7 +55,8 @@ func New(
cfg Config,
aggLayerClient agglayer.AgglayerClientInterface,
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer *bridgesync.BridgeSync) (*AggSender, error) {
l2Syncer *bridgesync.BridgeSync,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
if err != nil {
return nil, err
Expand All @@ -74,24 +77,30 @@ func New(
aggLayerClient: aggLayerClient,
l1infoTreeSyncer: l1InfoTreeSyncer,
sequencerKey: sequencerPrivateKey,
epochNotifier: epochNotifier,
}, nil
}

// Start starts the AggSender
func (a *AggSender) Start(ctx context.Context) {
go a.sendCertificates(ctx)
go a.checkIfCertificatesAreSettled(ctx)
a.sendCertificates(ctx)
}

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
ticker := time.NewTicker(a.cfg.BlockGetInterval.Duration)

chEpoch := a.epochNotifier.Subscribe("aggsender")
for {
select {
case <-ticker.C:
if _, err := a.sendCertificate(ctx); err != nil {
log.Error(err)
case epoch := <-chEpoch:
a.log.Infof("Epoch received: %s", epoch.String())
thereArePendingCerts, err := a.checkPendingCertificatesStatus(ctx)
if err == nil && !thereArePendingCerts {
if _, err := a.sendCertificate(ctx); err != nil {
log.Error(err)
}
} else {
log.Warnf("Skipping epoch %s because there are pending certificates %v or error: %w",
epoch.String(), thereArePendingCerts, err)
}
case <-ctx.Done():
a.log.Info("AggSender stopped")
Expand Down Expand Up @@ -183,7 +192,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
}

createdTime := time.Now().UTC().UnixMilli()
certInfo := aggsendertypes.CertificateInfo{
certInfo := types.CertificateInfo{
Height: certificate.Height,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
Expand Down Expand Up @@ -224,7 +233,7 @@ func (a *AggSender) saveCertificateToFile(signedCertificate *agglayer.SignedCert

// getNextHeightAndPreviousLER returns the height and previous LER for the new certificate
func (a *AggSender) getNextHeightAndPreviousLER(
lastSentCertificateInfo *aggsendertypes.CertificateInfo) (uint64, common.Hash) {
lastSentCertificateInfo *types.CertificateInfo) (uint64, common.Hash) {
height := lastSentCertificateInfo.Height + 1
if lastSentCertificateInfo.Status == agglayer.InError {
// previous certificate was in error, so we need to resend it
Expand All @@ -247,7 +256,7 @@ func (a *AggSender) getNextHeightAndPreviousLER(
func (a *AggSender) buildCertificate(ctx context.Context,
bridges []bridgesync.Bridge,
claims []bridgesync.Claim,
lastSentCertificateInfo aggsendertypes.CertificateInfo,
lastSentCertificateInfo types.CertificateInfo,
toBlock uint64) (*agglayer.Certificate, error) {
if len(bridges) == 0 && len(claims) == 0 {
return nil, errNoBridgesAndClaims
Expand Down Expand Up @@ -475,34 +484,30 @@ func (a *AggSender) signCertificate(certificate *agglayer.Certificate) (*agglaye
}, nil
}

// checkIfCertificatesAreSettled checks if certificates are settled
func (a *AggSender) checkIfCertificatesAreSettled(ctx context.Context) {
ticker := time.NewTicker(a.cfg.CheckSettledInterval.Duration)
for {
select {
case <-ticker.C:
a.checkPendingCertificatesStatus(ctx)
case <-ctx.Done():
return
}
}
}

// checkPendingCertificatesStatus checks the status of pending certificates
// and updates in the storage if it changed on agglayer
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) {
// It returns:
// bool -> if there are pending certificates
// error -> if there was an error
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) (bool, error) {
pendingCertificates, err := a.storage.GetCertificatesByStatus(nonSettledStatuses)
if err != nil {
a.log.Errorf("error getting pending certificates: %w", err)
return
err = fmt.Errorf("error getting pending certificates: %w", err)
a.log.Error(err)
return true, err
}
thereArePendingCertificates := false
a.log.Debugf("checkPendingCertificatesStatus num of pendingCertificates: %d", len(pendingCertificates))
for _, certificate := range pendingCertificates {
certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificate.CertificateID)
if err != nil {
a.log.Errorf("error getting certificate header of %s from agglayer: %w",
certificate.String(), err)
continue
err = fmt.Errorf("error getting certificate header of %d/%s from agglayer: %w",
certificate.Height, certificate.String(), err)
a.log.Error(err)
return true, err
}
if slices.Contains(nonSettledStatuses, certificateHeader.Status) {
thereArePendingCertificates = true
}
a.log.Debugf("aggLayerClient.GetCertificateHeader status [%s] of certificate %s ",
certificateHeader.Status,
Expand All @@ -516,11 +521,13 @@ func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) {
certificate.UpdatedAt = time.Now().UTC().UnixMilli()

if err := a.storage.UpdateCertificateStatus(ctx, *certificate); err != nil {
a.log.Errorf("error updating certificate %s status in storage: %w", certificateHeader.String(), err)
continue
err = fmt.Errorf("error updating certificate %s status in storage: %w", certificateHeader.String(), err)
a.log.Error(err)
return true, err
}
}
}
return thereArePendingCertificates, nil
}

// shouldSendCertificate checks if a certificate should be sent at given time
Expand Down
Loading

0 comments on commit dbc47e1

Please sign in to comment.