From cd6c134472439cc5a1f7ec16f48bfa60b1dec1dc Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Thu, 14 Jul 2022 16:44:28 -0600 Subject: [PATCH 1/7] retry with backoff timeout for block queries in cosmoschainprocessor --- .../chains/cosmos/cosmos_chain_processor.go | 73 +++++++++++++++++-- 1 file changed, 67 insertions(+), 6 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index d28fe284b..d77de8fb6 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -67,6 +67,9 @@ const ( defaultMinQueryLoopDuration = 1 * time.Second inSyncNumBlocksThreshold = 2 + + blockQueryRetryDelay = 200 * time.Millisecond + blockQueryRetries = 10 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -113,6 +116,68 @@ func (ccp *CosmosChainProcessor) latestHeightWithRetry(ctx context.Context) (lat })) } +// blockResultsWithRetry will query for block data, retrying with a longer timeout in case of failure. +// It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries. +func (ccp *CosmosChainProcessor) queryBlockResultsWithRetry( + ctx context.Context, + height int64, +) (blockRes *ctypes.ResultBlockResults, err error) { + timeout := queryTimeout + return blockRes, retry.Do(func() error { + queryCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + var err error + blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &height) + return err + }, + retry.Context(ctx), + retry.Attempts(blockQueryRetries), + retry.Delay(blockQueryRetryDelay), + retry.DelayType(retry.FixedDelay), + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + timeout *= 2 + ccp.log.Error( + "Failed to query block results", + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", blockQueryRetries), + zap.Error(err), + ) + }), + ) +} + +// ibcHeaderWithRetry will query for a block ibc header, retrying with a longer timeout in case of failure. +// It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries. +func (ccp *CosmosChainProcessor) queryIBCHeaderWithRetry( + ctx context.Context, + height int64, +) (ibcHeader provider.IBCHeader, err error) { + timeout := queryTimeout + return ibcHeader, retry.Do(func() error { + queryCtx, cancel := context.WithTimeout(ctx, queryTimeout) + defer cancel() + var err error + ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, height) + return err + }, + retry.Context(ctx), + retry.Attempts(blockQueryRetries), + retry.Delay(blockQueryRetryDelay), + retry.DelayType(retry.BackOffDelay), + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + timeout *= 2 + ccp.log.Error( + "Failed to query IBC header", + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", blockQueryRetries), + zap.Error(err), + ) + }), + ) +} + // clientState will return the most recent client state if client messages // have already been observed for the clientID, otherwise it will query for it. func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) { @@ -300,15 +365,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu var ibcHeader provider.IBCHeader i := i eg.Go(func() (err error) { - queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) - defer cancelQueryCtx() - blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i) + blockRes, err = ccp.queryBlockResultsWithRetry(ctx, i) return err }) eg.Go(func() (err error) { - queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout) - defer cancelQueryCtx() - ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, i) + ibcHeader, err = ccp.queryIBCHeaderWithRetry(ctx, i) return err }) From ec440589933ce8e732ed78068519afc1155777b4 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Thu, 14 Jul 2022 18:01:44 -0600 Subject: [PATCH 2/7] fine tune retry --- relayer/chains/cosmos/cosmos_chain_processor.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index d77de8fb6..2a85fcf51 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -60,8 +60,7 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *cosmos.CosmosProvider) * } const ( - queryTimeout = 5 * time.Second - blockResultsQueryTimeout = 2 * time.Minute + queryTimeout = 10 * time.Second latestHeightQueryRetryDelay = 1 * time.Second latestHeightQueryRetries = 5 @@ -69,7 +68,9 @@ const ( inSyncNumBlocksThreshold = 2 blockQueryRetryDelay = 200 * time.Millisecond - blockQueryRetries = 10 + + // With doubling the 10 second timeout each time, this gives ~3 mins max timeout + blockQueryRetries = 4 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -136,9 +137,11 @@ func (ccp *CosmosChainProcessor) queryBlockResultsWithRetry( retry.DelayType(retry.FixedDelay), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { + // double timeout, longer timeouts are needed for larger blocks. timeout *= 2 ccp.log.Error( "Failed to query block results", + zap.Int64("height", height), zap.Uint("attempt", n+1), zap.Uint("max_attempts", blockQueryRetries), zap.Error(err), @@ -155,7 +158,7 @@ func (ccp *CosmosChainProcessor) queryIBCHeaderWithRetry( ) (ibcHeader provider.IBCHeader, err error) { timeout := queryTimeout return ibcHeader, retry.Do(func() error { - queryCtx, cancel := context.WithTimeout(ctx, queryTimeout) + queryCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() var err error ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, height) @@ -168,8 +171,9 @@ func (ccp *CosmosChainProcessor) queryIBCHeaderWithRetry( retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { timeout *= 2 - ccp.log.Error( + ccp.log.Warn( "Failed to query IBC header", + zap.Int64("height", height), zap.Uint("attempt", n+1), zap.Uint("max_attempts", blockQueryRetries), zap.Error(err), From 0d6977a8514c78bdd7267c8654de2e3479cb4888 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 15 Jul 2022 00:40:29 -0600 Subject: [PATCH 3/7] dynamic block query time targeting with clock drift tolerance --- .../chains/cosmos/cosmos_chain_processor.go | 191 ++++++++++++------ 1 file changed, 134 insertions(+), 57 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 2a85fcf51..d4033bfdc 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -60,17 +60,46 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *cosmos.CosmosProvider) * } const ( - queryTimeout = 10 * time.Second + // Default timeout for initialization queries, block results query, light block query, + queryTimeout = 10 * time.Second + + // Back off delay between latest height query retries latestHeightQueryRetryDelay = 1 * time.Second - latestHeightQueryRetries = 5 + // Max retries for failed latest height queries + latestHeightQueryRetries = 5 + + // Initial query loop minimum duration. + // Rolling average block time will be used after startup. defaultMinQueryLoopDuration = 1 * time.Second - inSyncNumBlocksThreshold = 2 + // How many blocks behind the latest to consider in sync with the chain. + inSyncNumBlocksThreshold = 2 + + // Fixed delay between block query retry attempts. blockQueryRetryDelay = 200 * time.Millisecond - // With doubling the 10 second timeout each time, this gives ~3 mins max timeout + // With doubling the 10 second queryTimeout each time, this gives ~3 mins max timeout. blockQueryRetries = 4 + + // Each new delta block time will affect the rolling average by 1/n of this. + blockAverageSmoothing = 10 + + // Delta block times will be excluded from rolling average if they exceed this. + maxConsideredDeltaBlockTimeMs = 15000 + + // Scenarios for how much clock drift should be added to + // target ideal block query window. + + // Clock drift addition when the light block query fails + queryFailureClockDriftAdditionMs = 47 + + // Clock drift addition when the light block query succeeds + querySuccessClockDriftAdditionMs = -23 + + // Clock drift addition when the latest block is the same as + // the last successfully queried block + sameBlockClockDriftAdditionMs = 71 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -150,38 +179,6 @@ func (ccp *CosmosChainProcessor) queryBlockResultsWithRetry( ) } -// ibcHeaderWithRetry will query for a block ibc header, retrying with a longer timeout in case of failure. -// It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries. -func (ccp *CosmosChainProcessor) queryIBCHeaderWithRetry( - ctx context.Context, - height int64, -) (ibcHeader provider.IBCHeader, err error) { - timeout := queryTimeout - return ibcHeader, retry.Do(func() error { - queryCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - var err error - ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, height) - return err - }, - retry.Context(ctx), - retry.Attempts(blockQueryRetries), - retry.Delay(blockQueryRetryDelay), - retry.DelayType(retry.BackOffDelay), - retry.LastErrorOnly(true), - retry.OnRetry(func(n uint, err error) { - timeout *= 2 - ccp.log.Warn( - "Failed to query IBC header", - zap.Int64("height", height), - zap.Uint("attempt", n+1), - zap.Uint("max_attempts", blockQueryRetries), - zap.Error(err), - ) - }), - ) -} - // clientState will return the most recent client state if client messages // have already been observed for the clientID, otherwise it will query for it. func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) { @@ -202,9 +199,79 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin // queryCyclePersistence hold the variables that should be retained across queryCycles. type queryCyclePersistence struct { - latestHeight int64 - latestQueriedBlock int64 - minQueryLoopDuration time.Duration + latestHeight int64 + latestQueriedBlock int64 + latestQueriedBlockTime time.Time + averageBlockTimeMs int64 + minQueryLoopDuration time.Duration + clockDriftMs int64 +} + +// addClockDriftMs is used to modify the clock drift for targeting the +// next ideal window for block queries. For example if the light block query fails +// because it is too early to be queried, clock drift should be added. If the query +// succeeds, clock drift should be removed, but not as much as is added for the error +// case. Clock drift should also be added when checking the latest height and it has +// not yet incremented. +func (p *queryCyclePersistence) addClockDriftMs(ms int64) { + p.clockDriftMs += ms + if p.clockDriftMs < 0 { + p.clockDriftMs = 0 + } else if p.clockDriftMs > p.averageBlockTimeMs { + // Should not add more than the average block time as a delay + // when targeting the next available block query. + p.clockDriftMs = p.averageBlockTimeMs + } +} + +// dynamicBlockTime targets the next ideal window for a block query to attempt +// to schedule block queries for the exact time when they are available. +func (p *queryCyclePersistence) dynamicBlockTime( + log *zap.Logger, + queryStart time.Time, + latestBlockTime time.Time, +) { + var deltaBlockTime int64 + + if p.latestQueriedBlockTime.IsZero() { + // latestQueriedBlockTime not yet initialized + return + } + + // deltaT between previous block time and latest block time. + deltaBlockTime = latestBlockTime.Sub(p.latestQueriedBlockTime).Milliseconds() + + if deltaBlockTime > maxConsideredDeltaBlockTimeMs { + // treat halts and upgrades as outliers + return + } + if p.averageBlockTimeMs == 0 { + // initialize average block time with first measurement of deltaT + p.averageBlockTimeMs = deltaBlockTime + } else { + // compute rolling average of deltaT + weightedComponent := p.averageBlockTimeMs * (blockAverageSmoothing - 1) + p.averageBlockTimeMs = int64(float64(weightedComponent+deltaBlockTime) / blockAverageSmoothing) + } + + // calculate deltaT between the block timestamp and when we initiated the query. + timeQueriedAfterBlockTime := queryStart.Sub(latestBlockTime).Milliseconds() + if timeQueriedAfterBlockTime <= 0 { + log.Debug("Unexpected state, query start is before latest block time but query succeeded") + return + } + + // also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder. + // clock drift tolerant using clockDriftMs trim value + targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) + p.clockDriftMs + + p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow) + + log.Debug("Dynamic query time", + zap.Int64("avg_block_ms", p.averageBlockTimeMs), + zap.Int64("targeted_query_ms", targetedQueryTimeFromNow), + zap.Int64("clock_drift_ms", p.clockDriftMs), + ) } // Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors. @@ -256,17 +323,14 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui ccp.log.Debug("Entering main query loop") - ticker := time.NewTicker(persistence.minQueryLoopDuration) - for { - if err := ccp.queryCycle(ctx, &persistence); err != nil { - return err - } select { case <-ctx.Done(): return nil - case <-ticker.C: - ticker.Reset(persistence.minQueryLoopDuration) + case <-time.After(persistence.minQueryLoopDuration): + if err := ccp.queryCycle(ctx, &persistence); err != nil { + return err + } } } } @@ -337,6 +401,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu zap.Int64("latest_height", persistence.latestHeight), ) + if persistence.latestHeight == persistence.latestQueriedBlock { + persistence.addClockDriftMs(sameBlockClockDriftAdditionMs) + persistence.minQueryLoopDuration += 100 * time.Millisecond + return nil + } + // used at the end of the cycle to send signal to path processors to start processing if both chains are in sync and no new messages came in this cycle firstTimeInSync := false @@ -357,8 +427,6 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu ibcHeaderCache := make(processor.IBCHeaderCache) - ppChanged := false - var latestHeader cosmos.CosmosIBCHeader newLatestQueriedBlock := persistence.latestQueriedBlock @@ -368,12 +436,21 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu var blockRes *ctypes.ResultBlockResults var ibcHeader provider.IBCHeader i := i - eg.Go(func() (err error) { + queryStartTime := time.Now() + eg.Go(func() error { blockRes, err = ccp.queryBlockResultsWithRetry(ctx, i) return err }) - eg.Go(func() (err error) { - ibcHeader, err = ccp.queryIBCHeaderWithRetry(ctx, i) + eg.Go(func() error { + queryCtx, cancel := context.WithTimeout(ctx, queryTimeout) + defer cancel() + var err error + ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, i) + if err != nil { + persistence.addClockDriftMs(queryFailureClockDriftAdditionMs) + } else { + persistence.addClockDriftMs(querySuccessClockDriftAdditionMs) + } return err }) @@ -386,13 +463,14 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu heightUint64 := uint64(i) + latestBlockTime := latestHeader.SignedHeader.Time + ccp.latestBlock = provider.LatestBlock{ Height: heightUint64, - Time: latestHeader.SignedHeader.Time, + Time: latestBlockTime, } ibcHeaderCache[heightUint64] = latestHeader - ppChanged = true for _, tx := range blockRes.TxsResults { if tx.Code != 0 { @@ -406,13 +484,12 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu } } newLatestQueriedBlock = i - } - if newLatestQueriedBlock == persistence.latestQueriedBlock { - return nil + persistence.dynamicBlockTime(ccp.log, queryStartTime, latestBlockTime) + persistence.latestQueriedBlockTime = latestBlockTime } - if !ppChanged { + if newLatestQueriedBlock == persistence.latestQueriedBlock { if firstTimeInSync { for _, pp := range ccp.pathProcessors { pp.ProcessBacklogIfReady() From 1b5087f3ccfd49d628af1557049a8a8a1f084ecc Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 15 Jul 2022 01:15:12 -0600 Subject: [PATCH 4/7] account for query time --- relayer/chains/cosmos/cosmos_chain_processor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index d4033bfdc..c74586d1b 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -261,9 +261,12 @@ func (p *queryCyclePersistence) dynamicBlockTime( return } + queryDurationMs := time.Since(queryStart).Milliseconds() + // also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder. // clock drift tolerant using clockDriftMs trim value - targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) + p.clockDriftMs + targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) - + queryDurationMs + p.clockDriftMs p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow) From efde1cc98d10748d47aeb7c3a659d8e0e4e9d6a0 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 15 Jul 2022 01:29:25 -0600 Subject: [PATCH 5/7] remove unnecessary retry --- .../chains/cosmos/cosmos_chain_processor.go | 64 +++++-------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index c74586d1b..e87f3c475 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -60,9 +60,12 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *cosmos.CosmosProvider) * } const ( - // Default timeout for initialization queries, block results query, light block query, + // Timeout for initialization queries and light block query, queryTimeout = 10 * time.Second + // Timeout for block results query. Longer timeout needed for large blocks. + blockResultsQueryTimeout = 2 * time.Minute + // Back off delay between latest height query retries latestHeightQueryRetryDelay = 1 * time.Second @@ -91,10 +94,10 @@ const ( // Scenarios for how much clock drift should be added to // target ideal block query window. - // Clock drift addition when the light block query fails + // Clock drift addition when a block query fails queryFailureClockDriftAdditionMs = 47 - // Clock drift addition when the light block query succeeds + // Clock drift addition when the block queries succeeds querySuccessClockDriftAdditionMs = -23 // Clock drift addition when the latest block is the same as @@ -146,39 +149,6 @@ func (ccp *CosmosChainProcessor) latestHeightWithRetry(ctx context.Context) (lat })) } -// blockResultsWithRetry will query for block data, retrying with a longer timeout in case of failure. -// It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries. -func (ccp *CosmosChainProcessor) queryBlockResultsWithRetry( - ctx context.Context, - height int64, -) (blockRes *ctypes.ResultBlockResults, err error) { - timeout := queryTimeout - return blockRes, retry.Do(func() error { - queryCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - var err error - blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &height) - return err - }, - retry.Context(ctx), - retry.Attempts(blockQueryRetries), - retry.Delay(blockQueryRetryDelay), - retry.DelayType(retry.FixedDelay), - retry.LastErrorOnly(true), - retry.OnRetry(func(n uint, err error) { - // double timeout, longer timeouts are needed for larger blocks. - timeout *= 2 - ccp.log.Error( - "Failed to query block results", - zap.Int64("height", height), - zap.Uint("attempt", n+1), - zap.Uint("max_attempts", blockQueryRetries), - zap.Error(err), - ) - }), - ) -} - // clientState will return the most recent client state if client messages // have already been observed for the clientID, otherwise it will query for it. func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) { @@ -208,7 +178,7 @@ type queryCyclePersistence struct { } // addClockDriftMs is used to modify the clock drift for targeting the -// next ideal window for block queries. For example if the light block query fails +// next ideal window for block queries. For example if a block query fails // because it is too early to be queried, clock drift should be added. If the query // succeeds, clock drift should be removed, but not as much as is added for the error // case. Clock drift should also be added when checking the latest height and it has @@ -440,27 +410,25 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu var ibcHeader provider.IBCHeader i := i queryStartTime := time.Now() - eg.Go(func() error { - blockRes, err = ccp.queryBlockResultsWithRetry(ctx, i) + eg.Go(func() (err error) { + queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) + defer cancelQueryCtx() + blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i) return err }) - eg.Go(func() error { - queryCtx, cancel := context.WithTimeout(ctx, queryTimeout) - defer cancel() - var err error + eg.Go(func() (err error) { + queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout) + defer cancelQueryCtx() ibcHeader, err = ccp.chainProvider.IBCHeaderAtHeight(queryCtx, i) - if err != nil { - persistence.addClockDriftMs(queryFailureClockDriftAdditionMs) - } else { - persistence.addClockDriftMs(querySuccessClockDriftAdditionMs) - } return err }) if err := eg.Wait(); err != nil { + persistence.addClockDriftMs(queryFailureClockDriftAdditionMs) ccp.log.Warn("Error querying block data", zap.Error(err)) break } + persistence.addClockDriftMs(querySuccessClockDriftAdditionMs) latestHeader = ibcHeader.(cosmos.CosmosIBCHeader) From 84c761030eb9dbad937ba8681d9530bee9b6c68e Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 15 Jul 2022 02:33:14 -0600 Subject: [PATCH 6/7] fine tune clock drift --- relayer/chains/cosmos/cosmos_chain_processor.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index e87f3c475..ff8252eb9 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -95,14 +95,14 @@ const ( // target ideal block query window. // Clock drift addition when a block query fails - queryFailureClockDriftAdditionMs = 47 + queryFailureClockDriftAdditionMs = 73 // Clock drift addition when the block queries succeeds - querySuccessClockDriftAdditionMs = -23 + querySuccessClockDriftAdditionMs = -11 // Clock drift addition when the latest block is the same as // the last successfully queried block - sameBlockClockDriftAdditionMs = 71 + sameBlockClockDriftAdditionMs = 97 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -235,8 +235,8 @@ func (p *queryCyclePersistence) dynamicBlockTime( // also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder. // clock drift tolerant using clockDriftMs trim value - targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) - - queryDurationMs + p.clockDriftMs + targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) + + p.clockDriftMs - queryDurationMs p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow) @@ -376,7 +376,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu if persistence.latestHeight == persistence.latestQueriedBlock { persistence.addClockDriftMs(sameBlockClockDriftAdditionMs) - persistence.minQueryLoopDuration += 100 * time.Millisecond + persistence.minQueryLoopDuration = defaultMinQueryLoopDuration return nil } From 6e36e8b909af86d9aea70413612509805d4d5f90 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 18 Jul 2022 10:49:02 -0600 Subject: [PATCH 7/7] Rename clockDrift to timeTrim. Use timer and stop for ctx.Done --- .../chains/cosmos/cosmos_chain_processor.go | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index ff8252eb9..2ed6b669c 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -91,18 +91,18 @@ const ( // Delta block times will be excluded from rolling average if they exceed this. maxConsideredDeltaBlockTimeMs = 15000 - // Scenarios for how much clock drift should be added to + // Scenarios for how much time trim should be added to // target ideal block query window. - // Clock drift addition when a block query fails - queryFailureClockDriftAdditionMs = 73 + // Time trim addition when a block query fails + queryFailureTimeTrimAdditionMs = 73 - // Clock drift addition when the block queries succeeds - querySuccessClockDriftAdditionMs = -11 + // Time trim addition when the block queries succeeds + querySuccessTimeTrimAdditionMs = -31 - // Clock drift addition when the latest block is the same as + // Time trim addition when the latest block is the same as // the last successfully queried block - sameBlockClockDriftAdditionMs = 97 + sameBlockTimeTrimAdditionMs = 97 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -174,23 +174,23 @@ type queryCyclePersistence struct { latestQueriedBlockTime time.Time averageBlockTimeMs int64 minQueryLoopDuration time.Duration - clockDriftMs int64 + timeTrimMs int64 } -// addClockDriftMs is used to modify the clock drift for targeting the +// addTimeTrimMs is used to modify the time trim for targeting the // next ideal window for block queries. For example if a block query fails -// because it is too early to be queried, clock drift should be added. If the query -// succeeds, clock drift should be removed, but not as much as is added for the error -// case. Clock drift should also be added when checking the latest height and it has +// because it is too early to be queried, time trim should be added. If the query +// succeeds, time trim should be removed, but not as much as is added for the error +// case. Time trim should also be added when checking the latest height and it has // not yet incremented. -func (p *queryCyclePersistence) addClockDriftMs(ms int64) { - p.clockDriftMs += ms - if p.clockDriftMs < 0 { - p.clockDriftMs = 0 - } else if p.clockDriftMs > p.averageBlockTimeMs { +func (p *queryCyclePersistence) addTimeTrimMs(ms int64) { + p.timeTrimMs += ms + if p.timeTrimMs < 0 { + p.timeTrimMs = 0 + } else if p.timeTrimMs > p.averageBlockTimeMs { // Should not add more than the average block time as a delay // when targeting the next available block query. - p.clockDriftMs = p.averageBlockTimeMs + p.timeTrimMs = p.averageBlockTimeMs } } @@ -234,16 +234,16 @@ func (p *queryCyclePersistence) dynamicBlockTime( queryDurationMs := time.Since(queryStart).Milliseconds() // also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder. - // clock drift tolerant using clockDriftMs trim value + // time trim tolerant using timeTrimMs trim value targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) + - p.clockDriftMs - queryDurationMs + p.timeTrimMs - queryDurationMs p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow) log.Debug("Dynamic query time", zap.Int64("avg_block_ms", p.averageBlockTimeMs), zap.Int64("targeted_query_ms", targetedQueryTimeFromNow), - zap.Int64("clock_drift_ms", p.clockDriftMs), + zap.Int64("clock_drift_ms", p.timeTrimMs), ) } @@ -297,13 +297,16 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui ccp.log.Debug("Entering main query loop") for { + if err := ccp.queryCycle(ctx, &persistence); err != nil { + return err + } + t := time.NewTimer(persistence.minQueryLoopDuration) select { case <-ctx.Done(): + t.Stop() return nil - case <-time.After(persistence.minQueryLoopDuration): - if err := ccp.queryCycle(ctx, &persistence); err != nil { - return err - } + case <-t.C: + continue } } } @@ -375,7 +378,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu ) if persistence.latestHeight == persistence.latestQueriedBlock { - persistence.addClockDriftMs(sameBlockClockDriftAdditionMs) + persistence.addTimeTrimMs(sameBlockTimeTrimAdditionMs) persistence.minQueryLoopDuration = defaultMinQueryLoopDuration return nil } @@ -424,11 +427,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu }) if err := eg.Wait(); err != nil { - persistence.addClockDriftMs(queryFailureClockDriftAdditionMs) + persistence.addTimeTrimMs(queryFailureTimeTrimAdditionMs) ccp.log.Warn("Error querying block data", zap.Error(err)) break } - persistence.addClockDriftMs(querySuccessClockDriftAdditionMs) + persistence.addTimeTrimMs(querySuccessTimeTrimAdditionMs) latestHeader = ibcHeader.(cosmos.CosmosIBCHeader)