Skip to content

Commit

Permalink
Fix: return attestation err result to dispatcher (#1011)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 17, 2024
1 parent be75ef1 commit fc5dff1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 28 deletions.
12 changes: 12 additions & 0 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator
var aggSig *Signature
for _, quorumID := range quorumIDs {
if quorumAttestation.AggSignature[quorumID] == nil {
a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID)
continue
}
sig := quorumAttestation.AggSignature[quorumID]
if aggSig == nil {
aggSig = &Signature{sig.G1Point.Clone()}
Expand All @@ -284,6 +288,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator
var aggPubKey *G2Point
for _, quorumID := range quorumIDs {
if quorumAttestation.SignersAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID)
continue
}
apk := quorumAttestation.SignersAggPubKey[quorumID]
if aggPubKey == nil {
aggPubKey = apk.Clone()
Expand Down Expand Up @@ -315,6 +323,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In

quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs))
for _, quorumID := range quorumIDs {
if quorumAttestation.QuorumAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID)
continue
}
quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID]
}

Expand Down
16 changes: 9 additions & 7 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,22 +314,24 @@ type Attestation struct {
QuorumNumbers []core.QuorumID
}

func (a *Attestation) ToProtobuf() *disperserpb.Attestation {
func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
nonSignerPubKeys := make([][]byte, len(a.NonSignerPubKeys))
for i, p := range a.NonSignerPubKeys {
pubkeyBytes := p.Bytes()
nonSignerPubKeys[i] = pubkeyBytes[:]
}

quorumAPKs := make([][]byte, len(a.QuorumAPKs))
for i, p := range a.QuorumAPKs {
apkBytes := p.Bytes()
quorumAPKs[i] = apkBytes[:]
}

quorumNumbers := make([]uint32, len(a.QuorumNumbers))
for i, q := range a.QuorumNumbers {
quorumNumbers[i] = uint32(q)

apk, ok := a.QuorumAPKs[q]
if !ok {
return nil, fmt.Errorf("missing quorum APK for quorum %d", q)
}
apkBytes := apk.Bytes()
quorumAPKs[i] = apkBytes[:]
}

apkG2Bytes := a.APKG2.Bytes()
Expand All @@ -341,7 +343,7 @@ func (a *Attestation) ToProtobuf() *disperserpb.Attestation {
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
QuorumNumbers: quorumNumbers,
}
}, nil
}

type BlobVerificationInfo struct {
Expand Down
8 changes: 7 additions & 1 deletion disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,18 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu
continue
}

attestationProto, err := attestation.ToProtobuf()
if err != nil {
s.logger.Error("failed to convert attestation to protobuf", "err", err, "blobKey", blobKey.Hex())
continue
}

// return the first signed batch found
return &pb.BlobStatusReply{
Status: metadata.BlobStatus.ToProfobuf(),
SignedBatch: &pb.SignedBatch{
Header: batchHeader.ToProtobuf(),
Attestation: attestation.ToProtobuf(),
Attestation: attestationProto,
},
BlobVerificationInfo: blobVerificationInfoProto,
}, nil
Expand Down
3 changes: 2 additions & 1 deletion disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func TestV2GetBlobStatus(t *testing.T) {
require.Equal(t, verificationInfo0.InclusionProof, reply.GetBlobVerificationInfo().GetInclusionProof())
require.Equal(t, batchHeader.BatchRoot[:], reply.GetSignedBatch().GetHeader().BatchRoot)
require.Equal(t, batchHeader.ReferenceBlockNumber, reply.GetSignedBatch().GetHeader().ReferenceBlockNumber)
attestationProto := attestation.ToProtobuf()
attestationProto, err := attestation.ToProtobuf()
require.NoError(t, err)
require.Equal(t, attestationProto, reply.GetSignedBatch().GetAttestation())
}

Expand Down
61 changes: 45 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
sigChan, batchData, err := d.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToDispatch) {
d.logger.Warn("no blobs to dispatch")
d.logger.Debug("no blobs to dispatch")
} else {
d.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
close(sigChan)
// TODO(ian-shim): handle errors and mark failed
}()
}
Expand Down Expand Up @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
return
}

d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
var lastErr error
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
lastErr = err
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
Expand All @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
Signature: sig,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0, // TODO: calculate latency
AttestationLatencyMs: float64(time.Since(sendChunksStart)),
Err: nil,
}

break
}

d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err)
d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}

if lastErr != nil {
d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: lastErr,
}
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

Expand All @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
if batchData == nil {
return errors.New("batchData is required")
}
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
}

aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
Expand All @@ -253,21 +281,22 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
APKG2: aggSig.AggPubKey,
QuorumAPKs: aggSig.QuorumAggPubKeys,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
QuorumNumbers: nonZeroQuorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
}

d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash)
return nil
}

Expand All @@ -286,16 +315,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber)

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error {
err := e.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToEncode) {
e.logger.Warn("no blobs to encode")
e.logger.Debug("no blobs to encode")
} else {
e.logger.Error("failed to process a batch", "err", err)
}
Expand Down Expand Up @@ -263,6 +263,8 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
if cursor != nil {
e.cursor = cursor
}

e.logger.Debug("successfully submitted encoding requests", "numBlobs", len(blobMetadatas))
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -14,8 +17,6 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/shirou/gopsutil/mem"
"runtime"
"time"
)

// ServerV2 implements the Node v2 proto APIs.
Expand Down Expand Up @@ -76,6 +77,12 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}

// TODO(ian-shim): support remote signer
if s.node.KeyPair == nil {
return nil, api.NewErrorInternal("missing key pair")
}

batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
Expand Down

0 comments on commit fc5dff1

Please sign in to comment.