Skip to content

Commit

Permalink
dynamic-lambda: continue tracking the best proposal even after freezi…
Browse files Browse the repository at this point in the history
…ng (#5701)

Co-authored-by: cce <[email protected]>
Co-authored-by: Nickolai Zeldovich <[email protected]>
Co-authored-by: Pavel Zbitskiy <[email protected]>
Co-authored-by: John Jannotti <[email protected]>
Co-authored-by: Jason Paulos <[email protected]>
  • Loading branch information
6 people authored Oct 13, 2023
1 parent 0358d1d commit 4249027
Show file tree
Hide file tree
Showing 17 changed files with 1,192 additions and 336 deletions.
47 changes: 47 additions & 0 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type action interface {
do(context.Context, *Service)

String() string
ComparableStr() string
}

type nonpersistent struct{}
Expand All @@ -88,6 +89,8 @@ func (a noopAction) String() string {
return a.t().String()
}

func (a noopAction) ComparableStr() string { return a.String() }

type networkAction struct {
nonpersistent

Expand Down Expand Up @@ -120,6 +123,13 @@ func (a networkAction) String() string {
return fmt.Sprintf("%s: %2v", a.t().String(), a.Tag)
}

func (a networkAction) ComparableStr() string {
if a.Tag == protocol.AgreementVoteTag {
return fmt.Sprintf("%s: %2v: %3v-%2v-%2v", a.t().String(), a.Tag, a.UnauthenticatedVote.R.Round, a.UnauthenticatedVote.R.Period, a.UnauthenticatedVote.R.Step)
}
return a.String()
}

func (a networkAction) do(ctx context.Context, s *Service) {
if a.T == broadcastVotes {
tag := protocol.AgreementVoteTag
Expand Down Expand Up @@ -192,6 +202,18 @@ func (a cryptoAction) String() string {
return a.t().String()
}

func (a cryptoAction) ComparableStr() (s string) {
switch a.T {
case verifyVote:
s = fmt.Sprintf("%s: %3v-%2v TaskIndex %d", a.t().String(), a.Round, a.Period, a.TaskIndex)
case verifyPayload:
s = fmt.Sprintf("%s: %3v-%2v Pinned %v", a.t().String(), a.Round, a.Period, a.Pinned)
case verifyBundle:
s = fmt.Sprintf("%s: %3v-%2v-%2v", a.t().String(), a.Round, a.Period, a.Step)
}
return
}

func (a cryptoAction) do(ctx context.Context, s *Service) {
switch a.T {
case verifyVote:
Expand Down Expand Up @@ -225,6 +247,8 @@ func (a ensureAction) String() string {
return fmt.Sprintf("%s: %.5s: %v, %v, %.5s", a.t().String(), a.Payload.Digest().String(), a.Certificate.Round, a.Certificate.Period, a.Certificate.Proposal.BlockDigest.String())
}

func (a ensureAction) ComparableStr() string { return a.String() }

func (a ensureAction) do(ctx context.Context, s *Service) {
logEvent := logspec.AgreementEvent{
Hash: a.Certificate.Proposal.BlockDigest.String(),
Expand Down Expand Up @@ -288,6 +312,8 @@ func (a stageDigestAction) String() string {
return fmt.Sprintf("%s: %.5s. %v. %v", a.t().String(), a.Certificate.Proposal.BlockDigest.String(), a.Certificate.Round, a.Certificate.Period)
}

func (a stageDigestAction) ComparableStr() string { return a.String() }

func (a stageDigestAction) do(ctx context.Context, service *Service) {
logEvent := logspec.AgreementEvent{
Hash: a.Certificate.Proposal.BlockDigest.String(),
Expand All @@ -314,8 +340,25 @@ func (a rezeroAction) String() string {
return a.t().String()
}

func (a rezeroAction) ComparableStr() string {
return fmt.Sprintf("%s: %d", a.t().String(), a.Round)
}

func (a rezeroAction) do(ctx context.Context, s *Service) {
s.Clock = s.Clock.Zero()
// Preserve the zero time of the new round a.Round (for
// period 0) for future use if a late proposal-vote arrives,
// for late credential tracking.
if _, ok := s.historicalClocks[a.Round]; !ok {
s.historicalClocks[a.Round] = s.Clock
}

// Garbage collect clocks that are too old
for rnd := range s.historicalClocks {
if a.Round > rnd+credentialRoundLag {
delete(s.historicalClocks, rnd)
}
}
}

type pseudonodeAction struct {
Expand All @@ -336,6 +379,8 @@ func (a pseudonodeAction) String() string {
return fmt.Sprintf("%v %3v-%2v-%2v: %.5v", a.t().String(), a.Round, a.Period, a.Step, a.Proposal.BlockDigest.String())
}

func (a pseudonodeAction) ComparableStr() string { return a.String() }

func (a pseudonodeAction) persistent() bool {
return a.T == attest
}
Expand Down Expand Up @@ -528,3 +573,5 @@ func (c checkpointAction) do(ctx context.Context, s *Service) {
func (c checkpointAction) String() string {
return c.t().String()
}

func (c checkpointAction) ComparableStr() string { return c.String() }
6 changes: 3 additions & 3 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func (d *demux) next(s *Service, deadline Deadline, fastDeadline Deadline, curre

switch e.t() {
case payloadVerified:
e = e.(messageEvent).AttachValidatedAt(s.Clock.Since(), currentRound)
e = e.(messageEvent).AttachValidatedAt(clockForRound(currentRound, s.Clock, s.historicalClocks))
case payloadPresent, votePresent:
e = e.(messageEvent).AttachReceivedAt(s.Clock.Since(), currentRound)
e = e.(messageEvent).AttachReceivedAt(clockForRound(currentRound, s.Clock, s.historicalClocks))
case voteVerified:
// if this is a proposal vote (step 0), record the validatedAt time on the vote
if e.(messageEvent).Input.Vote.R.Step == 0 {
e = e.(messageEvent).AttachValidatedAt(s.Clock.Since(), currentRound)
e = e.(messageEvent).AttachValidatedAt(clockForRound(currentRound, s.Clock, s.historicalClocks))
}
}
}()
Expand Down
95 changes: 68 additions & 27 deletions agreement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (e messageEvent) String() string {
}

func (e messageEvent) ComparableStr() string {
return e.T.String()
return fmt.Sprintf("{T:%s %d Err:%v}", e.t().String(), e.ConsensusRound(), e.Err)
}

func (e messageEvent) ConsensusRound() round {
Expand Down Expand Up @@ -424,9 +424,14 @@ type readLowestEvent struct {

// Vote holds the lowest-credential vote.
Vote vote
// LowestIncludingLate holds the lowest-credential vote that was received, including
// after Vote has been frozen.
LowestIncludingLate vote

// Filled indicates whether the Vote field is filled
Filled bool
// Filled and HasLowestIncludingLate indicates whether the Vote or LowestIncludingLate
// fields are filled, respectively.
Filled bool
HasLowestIncludingLate bool
}

func (e readLowestEvent) t() eventType {
Expand Down Expand Up @@ -591,10 +596,36 @@ func (e payloadProcessedEvent) ComparableStr() string {
return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String())
}

// LateCredentialTrackingEffect indicates the impact of a vote that was filtered (due to age)
// on the credential tracking system (in credentialArrivalHistory), for the purpose of tracking
// the time it took the best credential to arrive, even if it was late.
type LateCredentialTrackingEffect uint8

const (
// NoLateCredentialTrackingImpact indicates the filtered event would have no impact on
// the credential tracking mechanism.
NoLateCredentialTrackingImpact LateCredentialTrackingEffect = iota

// UnverifiedLateCredentialForTracking indicates the filtered event could impact
// the credential tracking mechanism and more processing (validation) may be required.
// It may be set by proposalManager when handling votePresent events.
UnverifiedLateCredentialForTracking

// VerifiedBetterLateCredentialForTracking indicates that the filtered event provides a new best
// credential for its round.
// It may be set by proposalManager when handling voteVerified events.
VerifiedBetterLateCredentialForTracking
)

type filteredEvent struct {
// {proposal,vote,bundle}{Filtered,Malformed}
T eventType

// LateCredentialTrackingNote indicates the impact of the filtered event on the
// credential tracking machinery used for dynamically setting the filter
// timeout.
LateCredentialTrackingNote LateCredentialTrackingEffect

// Err is the reason cryptographic verification failed and is set for
// events {proposal,vote,bundle}Malformed.
Err *serializableError
Expand Down Expand Up @@ -975,50 +1006,60 @@ func (e checkpointEvent) AttachConsensusVersion(v ConsensusVersionView) external
return e
}

// This timestamp is assigned to messages that arrive for round R+1 while the current player
// is still waiting for quorum on R.
const pipelinedMessageTimestamp = time.Nanosecond

//msgp:ignore constantRoundStartTimer
type constantRoundStartTimer time.Duration

func (c constantRoundStartTimer) Since() time.Duration { return time.Duration(c) }

// clockForRound retrieves the roundStartTimer used for AttachValidatedAt and AttachReceivedAt.
func clockForRound(currentRound round, currentClock roundStartTimer, historicalClocks map[round]roundStartTimer) func(round) roundStartTimer {
return func(eventRound round) roundStartTimer {
if eventRound > currentRound {
return constantRoundStartTimer(pipelinedMessageTimestamp)
}
if eventRound == currentRound {
return currentClock
}
if clock, ok := historicalClocks[eventRound]; ok {
return clock
}
return constantRoundStartTimer(0)
}
}

// AttachValidatedAt looks for a validated proposal or vote inside a
// payloadVerified or voteVerified messageEvent, and attaches the given time to
// the proposal's validatedAt field.
func (e messageEvent) AttachValidatedAt(d time.Duration, currentRound round) messageEvent {
func (e messageEvent) AttachValidatedAt(getClock func(eventRound round) roundStartTimer) messageEvent {
switch e.T {
case payloadVerified:
if e.Input.Proposal.Round() > currentRound {
e.Input.Proposal.validatedAt = 1
} else {
e.Input.Proposal.validatedAt = d
}
e.Input.Proposal.validatedAt = getClock(e.Input.Proposal.Round()).Since()
case voteVerified:
if e.Input.Vote.R.Round > currentRound {
e.Input.Vote.validatedAt = 1
} else {
e.Input.Vote.validatedAt = d
}
e.Input.Vote.validatedAt = getClock(e.Input.Vote.R.Round).Since()
}
return e
}

// AttachReceivedAt looks for an unauthenticatedProposal inside a
// payloadPresent or votePresent messageEvent, and attaches the given
// time to the proposal's receivedAt field.
func (e messageEvent) AttachReceivedAt(d time.Duration, currentRound round) messageEvent {
if e.T == payloadPresent {
if e.Input.UnauthenticatedProposal.Round() > currentRound {
e.Input.UnauthenticatedProposal.receivedAt = 1
} else {
e.Input.UnauthenticatedProposal.receivedAt = d
}
} else if e.T == votePresent {
func (e messageEvent) AttachReceivedAt(getClock func(eventRound round) roundStartTimer) messageEvent {
switch e.T {
case payloadPresent:
e.Input.UnauthenticatedProposal.receivedAt = getClock(e.Input.UnauthenticatedProposal.Round()).Since()
case votePresent:
// Check for non-nil Tail, indicating this votePresent event
// contains a synthetic payloadPresent event that was attached
// to it by setupCompoundMessage.
if e.Tail != nil && e.Tail.T == payloadPresent {
// The tail event is payloadPresent, serialized together
// with the proposal vote as a single CompoundMessage
// using a protocol.ProposalPayloadTag network message.
if e.Tail.Input.UnauthenticatedProposal.Round() > currentRound {
e.Tail.Input.UnauthenticatedProposal.receivedAt = 1
} else {
e.Tail.Input.UnauthenticatedProposal.receivedAt = d
}
e.Tail.Input.UnauthenticatedProposal.receivedAt = getClock(e.Tail.Input.UnauthenticatedProposal.Round()).Since()
}
}
return e
Expand Down
70 changes: 70 additions & 0 deletions agreement/msgp_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions agreement/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ func persistent(as []action) bool {
// encode serializes the current state into a byte array.
func encode(t timers.Clock[TimeoutType], rr rootRouter, p player, a []action, reflect bool) (raw []byte) {
var s diskState

// Don't persist state for old rounds
// rootRouter.update() may preserve roundRouters from credentialRoundLag rounds ago
children := make(map[round]*roundRouter)
for rnd, rndRouter := range rr.Children {
if rnd >= p.Round {
children[rnd] = rndRouter
}
}
if len(children) == 0 {
rr.Children = nil
} else {
rr.Children = children
}

if reflect {
s.Router = protocol.EncodeReflect(rr)
s.Player = protocol.EncodeReflect(p)
Expand Down
Loading

0 comments on commit 4249027

Please sign in to comment.