Skip to content

Commit

Permalink
retry streaming after given time
Browse files Browse the repository at this point in the history
  • Loading branch information
marino39 committed Apr 15, 2024
1 parent 4998f7f commit 21173ac
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ import (
)

var DefaultOptions = Options{
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
ExpectedBlockInterval: 15 * time.Second,
StreamingErrorResetInterval: 15 * time.Minute,
ErrorNumToSwitchToPolling: 6,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
ExpectedBlockInterval: 15 * time.Second,
StreamingErrorResetInterval: 15 * time.Minute,
StreamingRetryAfter: 1 * time.Hour,
StreamingErrNumToSwitchToPolling: 3,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
TrailNumBlocksBehindHead: 0, // latest
BlockRetentionLimit: 200,
WithLogs: false,
LogTopics: []common.Hash{}, // all logs
DebugLogging: false,
CacheExpiry: 300 * time.Second,
Alerter: util.NoopAlerter(),
}

type Options struct {
Expand All @@ -56,8 +57,11 @@ type Options struct {
// StreamingErrorResetInterval is the time to reset the streaming error count
StreamingErrorResetInterval time.Duration

// ErrorNumToSwitchToPolling is the number of errors before switching to polling
ErrorNumToSwitchToPolling int
// StreamingRetryAfter is the time to wait before retrying the streaming again
StreamingRetryAfter time.Duration

// StreamingErrNumToSwitchToPolling is the number of errors before switching to polling
StreamingErrNumToSwitchToPolling int

// Auto-unsubscribe on monitor stop or error
UnsubscribeOnStop bool
Expand Down Expand Up @@ -304,12 +308,17 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
var streamingErrorLastTime time.Time

reconnect:
// reset the latest head block
latestHeadBlock.Store(0)

// if we have too many streaming errors, we'll switch to polling
streamingErrorCount++
if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval {
streamingErrorCount = 0
}

if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.ErrorNumToSwitchToPolling {
// listen for new heads either via streaming or polling
if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")
Expand Down Expand Up @@ -366,11 +375,19 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
// We default to polling if streaming is not enabled
m.log.Info("ethmonitor: starting poll head listener")

retryStreamingTimer := time.NewTimer(m.options.StreamingRetryAfter)
for {
select {
case <-m.ctx.Done():
// if we're done, we'll close the nextBlock channel
close(nextBlock)
return

case <-retryStreamingTimer.C:
// retry streaming
m.log.Info("ethmonitor: retrying streaming...")
goto reconnect

case <-time.After(time.Duration(m.pollInterval.Load())):
nextBlock <- 0
}
Expand Down

0 comments on commit 21173ac

Please sign in to comment.