Skip to content

Commit

Permalink
[DEC-2147] - De-escalate unresponsive daemon panics to errors (#546)
Browse files Browse the repository at this point in the history
  • Loading branch information
clemire authored Oct 10, 2023
1 parent eb4bb1c commit e2f96b8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 26 deletions.
2 changes: 1 addition & 1 deletion protocol/daemons/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewServer(
gsrv: grpcServer,
fileHandler: fileHandler,
socketAddress: socketAddress,
updateMonitor: types.NewUpdateFrequencyMonitor(),
updateMonitor: types.NewUpdateFrequencyMonitor(types.DaemonStartupGracePeriod, logger),
}
stoppable.RegisterServiceForTestCleanup(uniqueTestIdentifier, srv)
return srv
Expand Down
5 changes: 3 additions & 2 deletions protocol/daemons/server/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ const (
DaemonStartupGracePeriod = 60 * time.Second

// MaximumLoopDelayMultiple defines the maximum acceptable update delay for a daemon as a multiple of the
// daemon's loop delay.
MaximumLoopDelayMultiple = 3
// daemon's loop delay. This is set to 8 to have generous headroom to ignore errors from the liquidations daemon,
// which we have sometimes seen to take up to ~10s to respond.
MaximumLoopDelayMultiple = 8

LiquidationsDaemonServiceName = "liquidations-daemon"
PricefeedDaemonServiceName = "pricefeed-daemon"
Expand Down
33 changes: 26 additions & 7 deletions protocol/daemons/server/types/update_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"fmt"
"github.com/cometbft/cometbft/libs/log"
"sync"
"time"
)
Expand All @@ -12,7 +13,7 @@ type updateMetadata struct {
}

// UpdateMonitor monitors the update frequency of daemon services. If a daemon service does not respond within
// the maximum acceptable update delay set when the daemon is registered, the monitor will panic and halt the
// the maximum acceptable update delay set when the daemon is registered, the monitor will log an error and halt the
// protocol. This was judged to be the best solution for network performance because it prevents any validator from
// participating in the network at all if a daemon service is not responding.
type UpdateMonitor struct {
Expand All @@ -26,12 +27,18 @@ type UpdateMonitor struct {
disabled bool
// lock is used to synchronize access to the monitor.
lock sync.Mutex

// These fields are initialized in NewUpdateFrequencyMonitor and are not modified after initialization.
logger log.Logger
daemonStartupGracePeriod time.Duration
}

// NewUpdateFrequencyMonitor creates a new update frequency monitor.
func NewUpdateFrequencyMonitor() *UpdateMonitor {
func NewUpdateFrequencyMonitor(daemonStartupGracePeriod time.Duration, logger log.Logger) *UpdateMonitor {
return &UpdateMonitor{
serviceToUpdateMetadata: make(map[string]updateMetadata),
serviceToUpdateMetadata: make(map[string]updateMetadata),
logger: logger,
daemonStartupGracePeriod: daemonStartupGracePeriod,
}
}

Expand Down Expand Up @@ -79,22 +86,34 @@ func (ufm *UpdateMonitor) RegisterDaemonServiceWithCallback(
}

ufm.serviceToUpdateMetadata[service] = updateMetadata{
timer: time.AfterFunc(DaemonStartupGracePeriod+maximumAcceptableUpdateDelay, callback),
timer: time.AfterFunc(ufm.daemonStartupGracePeriod+maximumAcceptableUpdateDelay, callback),
updateFrequency: maximumAcceptableUpdateDelay,
}
return nil
}

// PanicServiceNotResponding returns a function that panics with a message indicating that the specified daemon
// service is not responding. This is ideal for creating the callback function when registering a daemon service.
// service is not responding. This is ideal for creating a callback function when registering a daemon service.
func PanicServiceNotResponding(service string) func() {
return func() {
panic(fmt.Sprintf("%v daemon not responding", service))
}
}

// LogErrorServiceNotResponding returns a function that logs an error indicating that the specified daemon service
// is not responding. This is ideal for creating a callback function when registering a daemon service.
func LogErrorServiceNotResponding(service string, logger log.Logger) func() {
return func() {
logger.Error(
"daemon not responding",
"service",
service,
)
}
}

// RegisterDaemonService registers a new daemon service with the update frequency monitor. If the daemon service
// fails to respond within the maximum acceptable update delay, the monitor will execute a panic and halt the protocol.
// fails to respond within the maximum acceptable update delay, the monitor will log an error.
// This method is synchronized. The method an error if the daemon service was already registered or the monitor has
// already been stopped.
func (ufm *UpdateMonitor) RegisterDaemonService(
Expand All @@ -104,7 +123,7 @@ func (ufm *UpdateMonitor) RegisterDaemonService(
return ufm.RegisterDaemonServiceWithCallback(
service,
maximumAcceptableUpdateDelay,
PanicServiceNotResponding(service),
LogErrorServiceNotResponding(service, ufm.logger),
)
}

Expand Down
73 changes: 58 additions & 15 deletions protocol/daemons/server/types/update_monitor_test.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
package types
package types_test

import (
"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"sync/atomic"
"testing"
"time"
)

var (
zeroDuration = 0 * time.Second
)

func createTestMonitor() (*types.UpdateMonitor, *mocks.Logger) {
logger := &mocks.Logger{}
return types.NewUpdateFrequencyMonitor(zeroDuration, logger), logger
}

// The following tests may still intermittently fail on an overloaded system as they rely
// on `time.Sleep`, which is not guaranteed to wake up after the specified amount of time.

func TestRegisterDaemonService_Success(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, logger := createTestMonitor()
err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond)
require.NoError(t, err)

// As long as responses come in before the 200ms deadline, no panic should occur.
// As long as responses come in before the 200ms deadline, no errors should be logged.
time.Sleep(80 * time.Millisecond)
require.NoError(t, ufm.RegisterValidResponse("test-service"))
time.Sleep(80 * time.Millisecond)
require.NoError(t, ufm.RegisterValidResponse("test-service"))
time.Sleep(80 * time.Millisecond)

ufm.Stop()
// Assert: no calls to the logger were made.
mock.AssertExpectationsForObjects(t, logger)
}

func TestRegisterDaemonService_SuccessfullyLogsError(t *testing.T) {
ufm, logger := createTestMonitor()
logger.On("Error", "daemon not responding", "service", "test-service").Once().Return()
err := ufm.RegisterDaemonService("test-service", 1*time.Millisecond)
require.NoError(t, err)
time.Sleep(2 * time.Millisecond)
ufm.Stop()

// Assert: the logger was called with the expected arguments.
mock.AssertExpectationsForObjects(t, logger)
}

func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) {
callbackCalled := atomic.Bool{}

ufm := NewUpdateFrequencyMonitor()
ufm, _ := createTestMonitor()
err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() {
callbackCalled.Store(true)
})
Expand All @@ -47,29 +72,33 @@ func TestRegisterDaemonServiceWithCallback_Success(t *testing.T) {
}

func TestRegisterDaemonService_DoubleRegistrationFails(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, logger := createTestMonitor()

err := ufm.RegisterDaemonService("test-service", 200*time.Millisecond)
require.NoError(t, err)

// Register the same daemon service again. This should fail, and 50ms update frequency should be ignored.
err = ufm.RegisterDaemonService("test-service", 50*time.Millisecond)
require.ErrorContains(t, err, "service already registered")

// Confirm that the original 200ms update frequency is still in effect. 50ms would have triggered a panic.
// Note there is a possibility that 200ms will still cause a panic due to the semantics of Sleep, which is
// Confirm that the original 200ms update frequency is still in effect. 50ms would have triggered an error log.
// Note there is a possibility that 200ms will still cause an error log due to the semantics of Sleep, which is
// not guaranteed to sleep for exactly the specified duration.
time.Sleep(80 * time.Millisecond)
require.NoError(t, ufm.RegisterValidResponse("test-service"))
time.Sleep(80 * time.Millisecond)
ufm.Stop()

// Assert no calls to the logger were made.
mock.AssertExpectationsForObjects(t, logger)
}

func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T) {
// lock synchronizes callback flags and was added to avoid race test failures.
callback1Called := atomic.Bool{}
callback2Called := atomic.Bool{}

ufm := NewUpdateFrequencyMonitor()
ufm, _ := createTestMonitor()
// First registration should succeed.
err := ufm.RegisterDaemonServiceWithCallback("test-service", 200*time.Millisecond, func() {
callback1Called.Store(true)
Expand Down Expand Up @@ -100,17 +129,18 @@ func TestRegisterDaemonServiceWithCallback_DoubleRegistrationFails(t *testing.T)
}

func TestRegisterDaemonService_RegistrationFailsAfterStop(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, logger := createTestMonitor()
ufm.Stop()
err := ufm.RegisterDaemonService("test-service", 50*time.Millisecond)
require.ErrorContains(t, err, "monitor has been stopped")

// Any accidentally scheduled functions with panics should fire before this timer expires.
// Any scheduled functions with error logs that were not cleaned up should trigger before this sleep finishes.
time.Sleep(100 * time.Millisecond)
mock.AssertExpectationsForObjects(t, logger)
}

func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, _ := createTestMonitor()
ufm.Stop()

callbackCalled := atomic.Bool{}
Expand All @@ -129,18 +159,31 @@ func TestRegisterDaemonServiceWithCallback_RegistrationFailsAfterStop(t *testing
}

func TestRegisterValidResponse_NegativeUpdateDelay(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, logger := createTestMonitor()
err := ufm.RegisterDaemonService("test-service", -50*time.Millisecond)
require.ErrorContains(t, err, "update delay -50ms must be positive")

// Sanity check: no calls to the logger should have been made.
mock.AssertExpectationsForObjects(t, logger)
}

func TestRegisterValidResponseWithCallback_NegativeUpdateDelay(t *testing.T) {
ufm := NewUpdateFrequencyMonitor()
ufm, _ := createTestMonitor()
err := ufm.RegisterDaemonServiceWithCallback("test-service", -50*time.Millisecond, func() {})
require.ErrorContains(t, err, "update delay -50ms must be positive")
}

func TestPanicServiceNotResponding(t *testing.T) {
panicFunc := PanicServiceNotResponding("test-service")
panicFunc := types.PanicServiceNotResponding("test-service")
require.Panics(t, panicFunc)
}

func TestLogErrorServiceNotResponding(t *testing.T) {
logger := &mocks.Logger{}
logger.On("Error", "daemon not responding", "service", "test-service").Return()
logFunc := types.LogErrorServiceNotResponding("test-service", logger)
logFunc()

// Assert: the logger was called with the expected arguments.
mock.AssertExpectationsForObjects(t, logger)
}
2 changes: 1 addition & 1 deletion protocol/daemons/server/types/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestMaximumAcceptableUpdateDelay(t *testing.T) {
loopDelayMs := uint32(1000)
expected := time.Duration(MaximumLoopDelayMultiple * loopDelayMs * 1000000)
expected := time.Duration(MaximumLoopDelayMultiple*loopDelayMs) * time.Millisecond
actual := MaximumAcceptableUpdateDelay(loopDelayMs)
require.Equal(t, expected, actual)
}

0 comments on commit e2f96b8

Please sign in to comment.