From 4c0c2f8a5de8d28f0e75db3a1d2b14932969e82f Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Fri, 3 Jan 2025 18:21:23 +0200 Subject: [PATCH 1/3] removed redundunt comment --- engine/access/rest/websockets/controller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index 2b078f715e5..99d9fbc10a9 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -268,8 +268,7 @@ func (c *Controller) writeMessages(ctx context.Context) error { if !ok { return nil } - - // wait for the rate limiter to allow the next message write. + if err := c.limiter.WaitN(ctx, 1); err != nil { return fmt.Errorf("rate limiter wait failed: %w", err) } From b7c3f836aafd66e7e738f7ed4daa579ab660ff73 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 9 Jan 2025 12:41:34 +0200 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Yurii Oleksyshyn --- engine/access/rest/websockets/controller_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index 7dfb4cccfe2..18366594b62 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -681,7 +681,6 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { // - The number of messages processed matches the total messages sent. // - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms. func (s *WsControllerSuite) TestRateLimiter() { - s.T().Run("Enforces response rate limit", func(t *testing.T) { totalMessages := 5 // Number of messages to simulate. // Step 1: Create a mock WebSocket connection. @@ -725,7 +724,6 @@ func (s *WsControllerSuite) TestRateLimiter() { assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit") assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit") } - }) } // TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly. From 78192e0446193f10f0f62ab6769971183f2dfeb6 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 9 Jan 2025 13:00:04 +0200 Subject: [PATCH 3/3] Fixed remarks --- engine/access/rest/websockets/controller.go | 2 +- .../access/rest/websockets/controller_test.go | 88 ++++++++++--------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index 99d9fbc10a9..fe3325b576a 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -268,7 +268,7 @@ func (c *Controller) writeMessages(ctx context.Context) error { if !ok { return nil } - + if err := c.limiter.WaitN(ctx, 1); err != nil { return fmt.Errorf("rate limiter wait failed: %w", err) } diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index 18366594b62..4b3795f61b7 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -681,49 +681,57 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { // - The number of messages processed matches the total messages sent. // - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms. func (s *WsControllerSuite) TestRateLimiter() { - totalMessages := 5 // Number of messages to simulate. + t := s.T() + totalMessages := 5 // Number of messages to simulate. - // Step 1: Create a mock WebSocket connection. - conn := connmock.NewWebsocketConnection(t) - conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages) - - // Step 2: Configure the WebSocket controller with a rate limit. - config := NewDefaultWebsocketConfig() - config.MaxResponsesPerSecond = 2 // 2 messages per second. - controller := NewWebSocketController(s.logger, config, conn, nil) - - // Step 3: Simulate sending messages to the controller's `multiplexedStream`. - go func() { - for i := 0; i < totalMessages; i++ { - controller.multiplexedStream <- map[string]interface{}{ - "message": i, - } + // Step 1: Create a mock WebSocket connection. + conn := connmock.NewWebsocketConnection(t) + conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages) + + // Step 2: Configure the WebSocket controller with a rate limit. + config := NewDefaultWebsocketConfig() + config.MaxResponsesPerSecond = 2 // 2 messages per second. + controller := NewWebSocketController(s.logger, config, conn, nil) + + // Step 3: Simulate sending messages to the controller's `multiplexedStream`. + go func() { + for i := 0; i < totalMessages; i++ { + controller.multiplexedStream <- map[string]interface{}{ + "message": i, } - close(controller.multiplexedStream) - }() - - // Step 4: Collect timestamps of message writes for verification. - var timestamps []time.Time - conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) { - timestamps = append(timestamps, time.Now()) - }).Return(nil).Times(totalMessages) - - // Invoke the `writeMessages` method to process the stream. - _ = controller.writeMessages(context.Background()) - - // Step 5: Verify that all messages are processed. - require.Len(t, timestamps, totalMessages, "All messages should be processed") - - // Calculate the expected delay between messages based on the rate limit. - expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond) - const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation. - - // Step 6: Assert that the delays respect the rate limit with tolerance. - for i := 1; i < len(timestamps); i++ { - delay := timestamps[i].Sub(timestamps[i-1]) - assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit") - assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit") } + close(controller.multiplexedStream) + }() + + // Step 4: Collect timestamps of message writes for verification. + var timestamps []time.Time + msgCounter := 0 + conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) { + timestamps = append(timestamps, time.Now()) + + // Extract the actual written message + actualMessage := args.Get(0).(map[string]interface{}) + expectedMessage := map[string]interface{}{"message": msgCounter} + msgCounter++ + + assert.Equal(t, expectedMessage, actualMessage, "Received message does not match the expected message") + }).Return(nil).Times(totalMessages) + + // Invoke the `writeMessages` method to process the stream. + _ = controller.writeMessages(context.Background()) + + // Step 5: Verify that all messages are processed. + require.Len(t, timestamps, totalMessages, "All messages should be processed") + + // Calculate the expected delay between messages based on the rate limit. + expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond) + const tolerance = float64(5 * time.Millisecond) // Allow up to 5ms deviation. + + // Step 6: Assert that the delays respect the rate limit with tolerance. + for i := 1; i < len(timestamps); i++ { + delay := timestamps[i].Sub(timestamps[i-1]) + assert.InDelta(t, expectedDelay, delay, tolerance, "Messages should respect the rate limit") + } } // TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.