diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index d28fe284b..2ed6b669c 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -60,13 +60,49 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *cosmos.CosmosProvider) * } const ( - queryTimeout = 5 * time.Second - blockResultsQueryTimeout = 2 * time.Minute + // Timeout for initialization queries and light block query, + queryTimeout = 10 * time.Second + + // Timeout for block results query. Longer timeout needed for large blocks. + blockResultsQueryTimeout = 2 * time.Minute + + // Back off delay between latest height query retries latestHeightQueryRetryDelay = 1 * time.Second - latestHeightQueryRetries = 5 + // Max retries for failed latest height queries + latestHeightQueryRetries = 5 + + // Initial query loop minimum duration. + // Rolling average block time will be used after startup. defaultMinQueryLoopDuration = 1 * time.Second - inSyncNumBlocksThreshold = 2 + + // How many blocks behind the latest to consider in sync with the chain. + inSyncNumBlocksThreshold = 2 + + // Fixed delay between block query retry attempts. + blockQueryRetryDelay = 200 * time.Millisecond + + // With doubling the 10 second queryTimeout each time, this gives ~3 mins max timeout. + blockQueryRetries = 4 + + // Each new delta block time will affect the rolling average by 1/n of this. + blockAverageSmoothing = 10 + + // Delta block times will be excluded from rolling average if they exceed this. + maxConsideredDeltaBlockTimeMs = 15000 + + // Scenarios for how much time trim should be added to + // target ideal block query window. + + // Time trim addition when a block query fails + queryFailureTimeTrimAdditionMs = 73 + + // Time trim addition when the block queries succeeds + querySuccessTimeTrimAdditionMs = -31 + + // Time trim addition when the latest block is the same as + // the last successfully queried block + sameBlockTimeTrimAdditionMs = 97 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -133,9 +169,82 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin // queryCyclePersistence hold the variables that should be retained across queryCycles. type queryCyclePersistence struct { - latestHeight int64 - latestQueriedBlock int64 - minQueryLoopDuration time.Duration + latestHeight int64 + latestQueriedBlock int64 + latestQueriedBlockTime time.Time + averageBlockTimeMs int64 + minQueryLoopDuration time.Duration + timeTrimMs int64 +} + +// addTimeTrimMs is used to modify the time trim for targeting the +// next ideal window for block queries. For example if a block query fails +// because it is too early to be queried, time trim should be added. If the query +// succeeds, time trim should be removed, but not as much as is added for the error +// case. Time trim should also be added when checking the latest height and it has +// not yet incremented. +func (p *queryCyclePersistence) addTimeTrimMs(ms int64) { + p.timeTrimMs += ms + if p.timeTrimMs < 0 { + p.timeTrimMs = 0 + } else if p.timeTrimMs > p.averageBlockTimeMs { + // Should not add more than the average block time as a delay + // when targeting the next available block query. + p.timeTrimMs = p.averageBlockTimeMs + } +} + +// dynamicBlockTime targets the next ideal window for a block query to attempt +// to schedule block queries for the exact time when they are available. +func (p *queryCyclePersistence) dynamicBlockTime( + log *zap.Logger, + queryStart time.Time, + latestBlockTime time.Time, +) { + var deltaBlockTime int64 + + if p.latestQueriedBlockTime.IsZero() { + // latestQueriedBlockTime not yet initialized + return + } + + // deltaT between previous block time and latest block time. + deltaBlockTime = latestBlockTime.Sub(p.latestQueriedBlockTime).Milliseconds() + + if deltaBlockTime > maxConsideredDeltaBlockTimeMs { + // treat halts and upgrades as outliers + return + } + if p.averageBlockTimeMs == 0 { + // initialize average block time with first measurement of deltaT + p.averageBlockTimeMs = deltaBlockTime + } else { + // compute rolling average of deltaT + weightedComponent := p.averageBlockTimeMs * (blockAverageSmoothing - 1) + p.averageBlockTimeMs = int64(float64(weightedComponent+deltaBlockTime) / blockAverageSmoothing) + } + + // calculate deltaT between the block timestamp and when we initiated the query. + timeQueriedAfterBlockTime := queryStart.Sub(latestBlockTime).Milliseconds() + if timeQueriedAfterBlockTime <= 0 { + log.Debug("Unexpected state, query start is before latest block time but query succeeded") + return + } + + queryDurationMs := time.Since(queryStart).Milliseconds() + + // also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder. + // time trim tolerant using timeTrimMs trim value + targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) + + p.timeTrimMs - queryDurationMs + + p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow) + + log.Debug("Dynamic query time", + zap.Int64("avg_block_ms", p.averageBlockTimeMs), + zap.Int64("targeted_query_ms", targetedQueryTimeFromNow), + zap.Int64("clock_drift_ms", p.timeTrimMs), + ) } // Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors. @@ -187,17 +296,17 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui ccp.log.Debug("Entering main query loop") - ticker := time.NewTicker(persistence.minQueryLoopDuration) - for { if err := ccp.queryCycle(ctx, &persistence); err != nil { return err } + t := time.NewTimer(persistence.minQueryLoopDuration) select { case <-ctx.Done(): + t.Stop() return nil - case <-ticker.C: - ticker.Reset(persistence.minQueryLoopDuration) + case <-t.C: + continue } } } @@ -268,6 +377,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu zap.Int64("latest_height", persistence.latestHeight), ) + if persistence.latestHeight == persistence.latestQueriedBlock { + persistence.addTimeTrimMs(sameBlockTimeTrimAdditionMs) + persistence.minQueryLoopDuration = defaultMinQueryLoopDuration + return nil + } + // used at the end of the cycle to send signal to path processors to start processing if both chains are in sync and no new messages came in this cycle firstTimeInSync := false @@ -288,8 +403,6 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu ibcHeaderCache := make(processor.IBCHeaderCache) - ppChanged := false - var latestHeader cosmos.CosmosIBCHeader newLatestQueriedBlock := persistence.latestQueriedBlock @@ -299,6 +412,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu var blockRes *ctypes.ResultBlockResults var ibcHeader provider.IBCHeader i := i + queryStartTime := time.Now() eg.Go(func() (err error) { queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) defer cancelQueryCtx() @@ -313,21 +427,24 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu }) if err := eg.Wait(); err != nil { + persistence.addTimeTrimMs(queryFailureTimeTrimAdditionMs) ccp.log.Warn("Error querying block data", zap.Error(err)) break } + persistence.addTimeTrimMs(querySuccessTimeTrimAdditionMs) latestHeader = ibcHeader.(cosmos.CosmosIBCHeader) heightUint64 := uint64(i) + latestBlockTime := latestHeader.SignedHeader.Time + ccp.latestBlock = provider.LatestBlock{ Height: heightUint64, - Time: latestHeader.SignedHeader.Time, + Time: latestBlockTime, } ibcHeaderCache[heightUint64] = latestHeader - ppChanged = true for _, tx := range blockRes.TxsResults { if tx.Code != 0 { @@ -341,13 +458,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu } } newLatestQueriedBlock = i - } - if newLatestQueriedBlock == persistence.latestQueriedBlock { - return nil + persistence.dynamicBlockTime(ccp.log, queryStartTime, latestBlockTime) + persistence.latestQueriedBlockTime = latestBlockTime } - if !ppChanged { + if newLatestQueriedBlock == persistence.latestQueriedBlock { if firstTimeInSync { for _, pp := range ccp.pathProcessors { pp.ProcessBacklogIfReady()