Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Settiling with agglayer #87

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions aggregator/agglayer_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package aggregator

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk-rpc/types"
"github.com/ethereum/go-ethereum/common"
)

// AgglayerClientInterface is the interface that defines the methods that the AggLayerClient will implement
type AgglayerClientInterface interface {
SendTx(signedTx SignedTx) (common.Hash, error)
WaitTxToBeMined(hash common.Hash, ctx context.Context) error
}

// AggLayerClient is the client that will be used to interact with the AggLayer
type AggLayerClient struct {
url string
}

// NewAggLayerClient returns a client ready to be used
func NewAggLayerClient(url string) *AggLayerClient {
return &AggLayerClient{
url: url,
}
}

// SendTx sends a signed transaction to the AggLayer
func (c *AggLayerClient) SendTx(signedTx SignedTx) (common.Hash, error) {
response, err := rpc.JSONRPCCall(c.url, "interop_sendTx", signedTx)
if err != nil {
return common.Hash{}, err
}

if response.Error != nil {
return common.Hash{}, fmt.Errorf("%v %v", response.Error.Code, response.Error.Message)
}

var result types.ArgHash
err = json.Unmarshal(response.Result, &result)
if err != nil {
return common.Hash{}, err
}

return result.Hash(), nil
}

// WaitTxToBeMined waits for a transaction to be mined
func (c *AggLayerClient) WaitTxToBeMined(hash common.Hash, ctx context.Context) error {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return errors.New("context finished before tx was mined")
case <-ticker.C:
response, err := rpc.JSONRPCCall(c.url, "interop_getTxStatus", hash)
if err != nil {
return err
}

if response.Error != nil {
return fmt.Errorf("%v %v", response.Error.Code, response.Error.Message)
}

var result string
err = json.Unmarshal(response.Result, &result)
if err != nil {
return err
}
if strings.ToLower(result) == "done" {
return nil
}
}
}
}
63 changes: 63 additions & 0 deletions aggregator/agglayer_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package aggregator

import (
"crypto/ecdsa"

"github.com/0xPolygon/cdk-rpc/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)

// ZKP is the struct that contains the zero-knowledge proof
type ZKP struct {
NewStateRoot common.Hash `json:"newStateRoot"`
NewLocalExitRoot common.Hash `json:"newLocalExitRoot"`
Proof types.ArgBytes `json:"proof"`
}

// Tx is the struct that contains the verified batch transaction
type Tx struct {
RollupID uint32
LastVerifiedBatch types.ArgUint64 `json:"lastVerifiedBatch"`
NewVerifiedBatch types.ArgUint64 `json:"newVerifiedBatch"`
ZKP ZKP `json:"ZKP"`
}

// Hash returns a hash that uniquely identifies the tx
func (t *Tx) Hash() common.Hash {
return common.BytesToHash(crypto.Keccak256(
[]byte(t.LastVerifiedBatch.Hex()),
[]byte(t.NewVerifiedBatch.Hex()),
t.ZKP.NewStateRoot[:],
t.ZKP.NewLocalExitRoot[:],
[]byte(t.ZKP.Proof.Hex()),
))
}

// Sign returns a signed batch by the private key
func (t *Tx) Sign(privateKey *ecdsa.PrivateKey) (*SignedTx, error) {
hashToSign := t.Hash()
sig, err := crypto.Sign(hashToSign.Bytes(), privateKey)
if err != nil {
return nil, err
}
return &SignedTx{
Tx: *t,
Signature: sig,
}, nil
}

// SignedTx is the struct that contains the signed batch transaction
type SignedTx struct {
Tx Tx `json:"tx"`
Signature types.ArgBytes `json:"signature"`
}

// Signer returns the address of the signer
func (s *SignedTx) Signer() (common.Address, error) {
pubKey, err := crypto.SigToPub(s.Tx.Hash().Bytes(), s.Signature)
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*pubKey), nil
}
143 changes: 117 additions & 26 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,13 +14,14 @@ import (
"time"
"unicode"

"github.com/0xPolygon/cdk-rpc/rpc"
cdkTypes "github.com/0xPolygon/cdk-rpc/types"
"github.com/0xPolygonHermez/zkevm-aggregator/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-aggregator/aggregator/prover"
"github.com/0xPolygonHermez/zkevm-aggregator/config/types"
ethmanTypes "github.com/0xPolygonHermez/zkevm-aggregator/etherman/types"
"github.com/0xPolygonHermez/zkevm-aggregator/l1infotree"
"github.com/0xPolygonHermez/zkevm-aggregator/log"
"github.com/0xPolygonHermez/zkevm-aggregator/rpclient"
"github.com/0xPolygonHermez/zkevm-aggregator/state"
"github.com/0xPolygonHermez/zkevm-aggregator/state/datastream"
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
Expand Down Expand Up @@ -81,10 +83,17 @@ type Aggregator struct {
srv *grpc.Server
ctx context.Context
exit context.CancelFunc

sequencerPrivateKey *ecdsa.PrivateKey
aggLayerClient AgglayerClientInterface
}

// New creates a new aggregator.
func New(ctx context.Context, cfg Config, stateInterface stateInterface, etherman etherman) (*Aggregator, error) {
func New(
ctx context.Context,
cfg Config,
stateInterface stateInterface,
etherman etherman) (*Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker

switch cfg.TxProfitabilityCheckerType {
Expand Down Expand Up @@ -138,6 +147,20 @@ func New(ctx context.Context, cfg Config, stateInterface stateInterface, etherma
log.Fatalf("failed to create synchronizer client, error: %v", err)
}

var (
aggLayerClient AgglayerClientInterface
sequencerPrivateKey *ecdsa.PrivateKey
)

if cfg.SettlementBackend == AggLayer {
aggLayerClient = NewAggLayerClient(cfg.AggLayerURL)

sequencerPrivateKey, err = newKeyFromKeystore(cfg.SequencerPrivateKey)
if err != nil {
return nil, err
}
}

a := &Aggregator{
cfg: cfg,
state: stateInterface,
Expand All @@ -151,6 +174,8 @@ func New(ctx context.Context, cfg Config, stateInterface stateInterface, etherma
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
}

// Set function to handle the batches from the data stream
Expand Down Expand Up @@ -599,35 +624,101 @@ func (a *Aggregator) sendFinalProof() {
NewStateRoot: finalBatch.StateRoot.Bytes(),
}

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.etherman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs, sender)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}

monitoredTxID, err := a.ethTxManager.Add(ctx, to, nil, big.NewInt(0), data, a.cfg.GasOffset, nil)
if err != nil {
log.Errorf("Error Adding TX to ethTxManager: %v", err)
mTxLogger := ethtxmanager.CreateLogger(monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
switch a.cfg.SettlementBackend {
case AggLayer:
if success := a.settleWithAggLayer(ctx, proof, inputs); !success {
continue
}
default:
if success := a.settleDirect(ctx, proof, inputs); !success {
continue
}
}

// process monitored batch verifications before starting a next cycle
a.ethTxManager.ProcessPendingMonitoredTxs(ctx, func(result ethtxmanager.MonitoredTxResult) {
a.handleMonitoredTxResult(result)
})

a.resetVerifyProofTime()
a.endProofVerification()
}
}
}

func (a *Aggregator) settleWithAggLayer(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs) bool {
proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x")
proofBytes := common.Hex2Bytes(proofStrNo0x)
tx := Tx{
LastVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumber - 1),
NewVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumberFinal),
ZKP: ZKP{
NewStateRoot: common.BytesToHash(inputs.NewStateRoot),
NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot),
Proof: cdkTypes.ArgBytes(proofBytes),
},
RollupID: a.etherman.GetRollupId(),
}
signedTx, err := tx.Sign(a.sequencerPrivateKey)
if err != nil {
log.Errorf("failed to sign tx: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

log.Debug("final proof signedTx: ", signedTx.Tx.ZKP.Proof.Hex())
txHash, err := a.aggLayerClient.SendTx(*signedTx)
if err != nil {
log.Errorf("failed to send tx to the agglayer: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

log.Infof("tx %s sent to agglayer, waiting to be mined", txHash.Hex())
log.Debugf("Timeout set to %f seconds", a.cfg.AggLayerTxTimeout.Duration.Seconds())
waitCtx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(a.cfg.AggLayerTxTimeout.Duration))
defer cancelFunc()
if err := a.aggLayerClient.WaitTxToBeMined(txHash, waitCtx); err != nil {
log.Errorf("agglayer didn't mine the tx: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

return true
}

// settleDirect sends the final proof to the L1 smart contract directly.
func (a *Aggregator) settleDirect(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs) bool {
// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.etherman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs, sender)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return false
}

monitoredTxID, err := a.ethTxManager.Add(ctx, to, nil, big.NewInt(0), data, a.cfg.GasOffset, nil)
if err != nil {
log.Errorf("Error Adding TX to ethTxManager: %v", err)
mTxLogger := ethtxmanager.CreateLogger(monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return false
}

// process monitored batch verifications before starting a next cycle
a.ethTxManager.ProcessPendingMonitoredTxs(ctx, func(result ethtxmanager.MonitoredTxResult) {
a.handleMonitoredTxResult(result)
})

return true
}

func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil
Expand Down Expand Up @@ -1461,16 +1552,16 @@ func calculateAccInputHash(oldAccInputHash common.Hash, batchData []byte, l1Info

func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error) {
var witness string
var response rpclient.Response
var response rpc.Response
var err error

if fullWitness {
response, err = rpclient.JSONRPCCall(URL, "zkevm_getBatchWitness", nil, "1", batchNumber, "full")
response, err = rpc.JSONRPCCall(URL, "zkevm_getBatchWitness", "1", batchNumber, "full")
if err != nil {
return nil, err
}
} else {
response, err = rpclient.JSONRPCCall(URL, "zkevm_getBatchWitness", nil, "batch-1", batchNumber)
response, err = rpc.JSONRPCCall(URL, "zkevm_getBatchWitness", "batch-1", batchNumber)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading