Skip to content

Commit

Permalink
Merge branch 'master' into AndriiDiachuk/TestMessageIndexTransactionS…
Browse files Browse the repository at this point in the history
…tatusesProviderResponse_HappyPath-fix-flaky-test
  • Loading branch information
AndriiDiachuk authored Jan 9, 2025
2 parents 82d9e8d + 98e7b14 commit 8644455
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
8 changes: 8 additions & 0 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -130,6 +132,7 @@ type Controller struct {
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
dataProvidersGroup *sync.WaitGroup
limiter *rate.Limiter
}

func NewWebSocketController(
Expand All @@ -146,6 +149,7 @@ func NewWebSocketController(
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProviderFactory: dataProviderFactory,
dataProvidersGroup: &sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.MaxResponsesPerSecond), 1),
}
}

Expand Down Expand Up @@ -265,6 +269,10 @@ func (c *Controller) writeMessages(ctx context.Context) error {
return nil
}

if err := c.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limiter wait failed: %w", err)
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
Expand Down
66 changes: 66 additions & 0 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,72 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
})
}

// TestRateLimiter tests the rate-limiting functionality of the WebSocket controller.
//
// Test Steps:
// 1. Create a mock WebSocket connection with behavior for `SetWriteDeadline` and `WriteJSON`.
// 2. Configure the WebSocket controller with a rate limit of 2 responses per second.
// 3. Simulate sending messages to the `multiplexedStream` channel.
// 4. Collect timestamps of message writes to verify rate-limiting behavior.
// 5. Assert that all messages are processed and that the delay between messages respects the configured rate limit.
//
// The test ensures that:
// - The number of messages processed matches the total messages sent.
// - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms.
func (s *WsControllerSuite) TestRateLimiter() {
t := s.T()
totalMessages := 5 // Number of messages to simulate.

// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
msgCounter := 0
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())

// Extract the actual written message
actualMessage := args.Get(0).(map[string]interface{})
expectedMessage := map[string]interface{}{"message": msgCounter}
msgCounter++

assert.Equal(t, expectedMessage, actualMessage, "Received message does not match the expected message")
}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = float64(5 * time.Millisecond) // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.InDelta(t, expectedDelay, delay, tolerance, "Messages should respect the rate limit")
}
}

// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
func (s *WsControllerSuite) TestConfigureKeepaliveConnection() {
s.T().Run("Happy path", func(t *testing.T) {
Expand Down

0 comments on commit 8644455

Please sign in to comment.