From 1768160e8f8a446afba34100ef2c83fa91fb1106 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Tue, 16 Apr 2024 14:54:09 +0200 Subject: [PATCH 1/3] Fixes: timer resource leak where it can be garbage collected only after it fires --- ethmonitor/ethmonitor.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 8450bdf5..fce60e85 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -334,7 +334,6 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect } - blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval) for { select { case <-m.ctx.Done(): @@ -352,7 +351,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { streamingErrorLastTime = time.Now() goto reconnect - case <-blockTimer.C: + case <-time.After(3 * m.options.ExpectedBlockInterval): // if we haven't received a new block in a while, we'll reconnect. m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..") sub.Unsubscribe() @@ -361,8 +360,6 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect case newHead := <-newHeads: - blockTimer.Reset(3 * m.options.ExpectedBlockInterval) - latestHeadBlock.Store(newHead.Number.Uint64()) select { case nextBlock <- newHead.Number.Uint64(): @@ -395,6 +392,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 { case <-m.ctx.Done(): // if we're done, we'll close the nextBlock channel close(nextBlock) + retryStreamingTimer.Stop() return case <-time.After(time.Duration(m.pollInterval.Load())): From 80d4cad2b22411cfb23bbb081cca9cfc55689a9d Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Tue, 16 Apr 2024 15:38:47 +0200 Subject: [PATCH 2/3] fix: blockTimer --- ethmonitor/ethmonitor.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index fce60e85..0a3e86d4 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -29,8 +29,8 @@ var DefaultOptions = Options{ Logger: logger.NewLogger(logger.LogLevel_WARN), PollingInterval: 1500 * time.Millisecond, ExpectedBlockInterval: 15 * time.Second, - StreamingErrorResetInterval: 15 * time.Minute, - StreamingRetryAfter: 20 * time.Minute, + StreamingErrorResetInterval: 1 * time.Minute, + StreamingRetryAfter: 2 * time.Minute, StreamingErrNumToSwitchToPolling: 3, UnsubscribeOnStop: false, Timeout: 20 * time.Second, @@ -335,11 +335,14 @@ func (m *Monitor) listenNewHead() <-chan uint64 { } for { + blockTimer := time.NewTimer(3 * m.options.ExpectedBlockInterval) + select { case <-m.ctx.Done(): // if we're done, we'll unsubscribe and close the nextBlock channel sub.Unsubscribe() close(nextBlock) + blockTimer.Stop() return case err := <-sub.Err(): @@ -349,9 +352,10 @@ func (m *Monitor) listenNewHead() <-chan uint64 { sub.Unsubscribe() streamingErrorLastTime = time.Now() + blockTimer.Stop() goto reconnect - case <-time.After(3 * m.options.ExpectedBlockInterval): + case <-blockTimer.C: // if we haven't received a new block in a while, we'll reconnect. m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..") sub.Unsubscribe() @@ -360,6 +364,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect case newHead := <-newHeads: + blockTimer.Stop() + latestHeadBlock.Store(newHead.Number.Uint64()) select { case nextBlock <- newHead.Number.Uint64(): From add1519d6fa51f627d8bb84c86bc88c22ffa0e59 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Tue, 16 Apr 2024 15:45:34 +0200 Subject: [PATCH 3/3] restore defaults --- ethmonitor/ethmonitor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 0a3e86d4..1c01ad54 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -29,8 +29,8 @@ var DefaultOptions = Options{ Logger: logger.NewLogger(logger.LogLevel_WARN), PollingInterval: 1500 * time.Millisecond, ExpectedBlockInterval: 15 * time.Second, - StreamingErrorResetInterval: 1 * time.Minute, - StreamingRetryAfter: 2 * time.Minute, + StreamingErrorResetInterval: 15 * time.Minute, + StreamingRetryAfter: 20 * time.Minute, StreamingErrNumToSwitchToPolling: 3, UnsubscribeOnStop: false, Timeout: 20 * time.Second,