Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosChainProcessor - dynamic block time targeting #847

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 135 additions & 19 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,49 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *cosmos.CosmosProvider) *
}

const (
queryTimeout = 5 * time.Second
blockResultsQueryTimeout = 2 * time.Minute
// Timeout for initialization queries and light block query,
queryTimeout = 10 * time.Second

// Timeout for block results query. Longer timeout needed for large blocks.
blockResultsQueryTimeout = 2 * time.Minute

// Back off delay between latest height query retries
latestHeightQueryRetryDelay = 1 * time.Second
latestHeightQueryRetries = 5

// Max retries for failed latest height queries
latestHeightQueryRetries = 5

// Initial query loop minimum duration.
// Rolling average block time will be used after startup.
defaultMinQueryLoopDuration = 1 * time.Second
inSyncNumBlocksThreshold = 2

// How many blocks behind the latest to consider in sync with the chain.
inSyncNumBlocksThreshold = 2

// Fixed delay between block query retry attempts.
blockQueryRetryDelay = 200 * time.Millisecond

// With doubling the 10 second queryTimeout each time, this gives ~3 mins max timeout.
blockQueryRetries = 4

// Each new delta block time will affect the rolling average by 1/n of this.
blockAverageSmoothing = 10

// Delta block times will be excluded from rolling average if they exceed this.
maxConsideredDeltaBlockTimeMs = 15000

// Scenarios for how much time trim should be added to
// target ideal block query window.

// Time trim addition when a block query fails
queryFailureTimeTrimAdditionMs = 73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to hear how you arrived at these.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used prime numbers (to avoid fixed multiples) that would allow a steady backoff in the case of errors (up to a limit), and a smaller number for successes to fine tune the window when blocks are expected to be available. This allows it to find the ideal window within ~15 blocks and then remain there. Outlier scenarios like one-off blocks that take an unexpectedly long amount of time to become available will not derail the targeting.


// Time trim addition when the block queries succeeds
querySuccessTimeTrimAdditionMs = -31

// Time trim addition when the latest block is the same as
// the last successfully queried block
sameBlockTimeTrimAdditionMs = 97
)

// latestClientState is a map of clientID to the latest clientInfo for that client.
Expand Down Expand Up @@ -133,9 +169,82 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin

// queryCyclePersistence hold the variables that should be retained across queryCycles.
type queryCyclePersistence struct {
latestHeight int64
latestQueriedBlock int64
minQueryLoopDuration time.Duration
latestHeight int64
latestQueriedBlock int64
latestQueriedBlockTime time.Time
averageBlockTimeMs int64
minQueryLoopDuration time.Duration
timeTrimMs int64
}

// addTimeTrimMs is used to modify the time trim for targeting the
// next ideal window for block queries. For example if a block query fails
// because it is too early to be queried, time trim should be added. If the query
// succeeds, time trim should be removed, but not as much as is added for the error
// case. Time trim should also be added when checking the latest height and it has
// not yet incremented.
func (p *queryCyclePersistence) addTimeTrimMs(ms int64) {
p.timeTrimMs += ms
if p.timeTrimMs < 0 {
p.timeTrimMs = 0
} else if p.timeTrimMs > p.averageBlockTimeMs {
// Should not add more than the average block time as a delay
// when targeting the next available block query.
p.timeTrimMs = p.averageBlockTimeMs
}
}

// dynamicBlockTime targets the next ideal window for a block query to attempt
// to schedule block queries for the exact time when they are available.
func (p *queryCyclePersistence) dynamicBlockTime(
log *zap.Logger,
queryStart time.Time,
latestBlockTime time.Time,
) {
var deltaBlockTime int64

if p.latestQueriedBlockTime.IsZero() {
// latestQueriedBlockTime not yet initialized
return
}

// deltaT between previous block time and latest block time.
deltaBlockTime = latestBlockTime.Sub(p.latestQueriedBlockTime).Milliseconds()

if deltaBlockTime > maxConsideredDeltaBlockTimeMs {
// treat halts and upgrades as outliers
return
}
if p.averageBlockTimeMs == 0 {
// initialize average block time with first measurement of deltaT
p.averageBlockTimeMs = deltaBlockTime
} else {
// compute rolling average of deltaT
weightedComponent := p.averageBlockTimeMs * (blockAverageSmoothing - 1)
p.averageBlockTimeMs = int64(float64(weightedComponent+deltaBlockTime) / blockAverageSmoothing)
}

// calculate deltaT between the block timestamp and when we initiated the query.
timeQueriedAfterBlockTime := queryStart.Sub(latestBlockTime).Milliseconds()
if timeQueriedAfterBlockTime <= 0 {
log.Debug("Unexpected state, query start is before latest block time but query succeeded")
return
}

queryDurationMs := time.Since(queryStart).Milliseconds()

// also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder.
// time trim tolerant using timeTrimMs trim value
targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) +
p.timeTrimMs - queryDurationMs

p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow)

log.Debug("Dynamic query time",
zap.Int64("avg_block_ms", p.averageBlockTimeMs),
zap.Int64("targeted_query_ms", targetedQueryTimeFromNow),
zap.Int64("clock_drift_ms", p.timeTrimMs),
)
}

// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
Expand Down Expand Up @@ -187,17 +296,17 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui

ccp.log.Debug("Entering main query loop")

ticker := time.NewTicker(persistence.minQueryLoopDuration)

for {
if err := ccp.queryCycle(ctx, &persistence); err != nil {
return err
}
t := time.NewTimer(persistence.minQueryLoopDuration)
select {
case <-ctx.Done():
t.Stop()
return nil
case <-ticker.C:
ticker.Reset(persistence.minQueryLoopDuration)
case <-t.C:
continue
}
}
}
Expand Down Expand Up @@ -268,6 +377,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
zap.Int64("latest_height", persistence.latestHeight),
)

if persistence.latestHeight == persistence.latestQueriedBlock {
persistence.addTimeTrimMs(sameBlockTimeTrimAdditionMs)
persistence.minQueryLoopDuration = defaultMinQueryLoopDuration
return nil
}

// used at the end of the cycle to send signal to path processors to start processing if both chains are in sync and no new messages came in this cycle
firstTimeInSync := false

Expand All @@ -288,8 +403,6 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu

ibcHeaderCache := make(processor.IBCHeaderCache)

ppChanged := false

var latestHeader cosmos.CosmosIBCHeader

newLatestQueriedBlock := persistence.latestQueriedBlock
Expand All @@ -299,6 +412,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
var blockRes *ctypes.ResultBlockResults
var ibcHeader provider.IBCHeader
i := i
queryStartTime := time.Now()
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout)
defer cancelQueryCtx()
Expand All @@ -313,21 +427,24 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
})

if err := eg.Wait(); err != nil {
persistence.addTimeTrimMs(queryFailureTimeTrimAdditionMs)
ccp.log.Warn("Error querying block data", zap.Error(err))
break
}
persistence.addTimeTrimMs(querySuccessTimeTrimAdditionMs)

latestHeader = ibcHeader.(cosmos.CosmosIBCHeader)

heightUint64 := uint64(i)

latestBlockTime := latestHeader.SignedHeader.Time

ccp.latestBlock = provider.LatestBlock{
Height: heightUint64,
Time: latestHeader.SignedHeader.Time,
Time: latestBlockTime,
}

ibcHeaderCache[heightUint64] = latestHeader
ppChanged = true

for _, tx := range blockRes.TxsResults {
if tx.Code != 0 {
Expand All @@ -341,13 +458,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
}
}
newLatestQueriedBlock = i
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
return nil
persistence.dynamicBlockTime(ccp.log, queryStartTime, latestBlockTime)
persistence.latestQueriedBlockTime = latestBlockTime
}

if !ppChanged {
if newLatestQueriedBlock == persistence.latestQueriedBlock {
if firstTimeInSync {
for _, pp := range ccp.pathProcessors {
pp.ProcessBacklogIfReady()
Expand Down