From d82b307a41cebd46829ef13a3cc824be98656f7d Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Wed, 2 Oct 2024 17:08:49 +0200 Subject: [PATCH] [OPIK-144] Fix condition when limit expires and application tries to retrieve to check available permits --- .../ratelimit/RateLimitInterceptor.java | 12 +++++----- .../ratelimit/RateLimitService.java | 9 +++---- .../redis/RedisRateLimitService.java | 24 ++++++++++++------- .../ratelimit/RateLimitE2ETest.java | 12 ++++++++++ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitInterceptor.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitInterceptor.java index aef9e154dc..f8e80071a5 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitInterceptor.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitInterceptor.java @@ -61,7 +61,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { try { return invocation.proceed(); } finally { - setLimitHeaders(apiKey, limitBucket); + setLimitHeaders(apiKey, limitBucket, generalLimit); } } @@ -69,21 +69,21 @@ private void verifyRateLimit(long events, String apiKey, String bucket, LimitCon // Check if the rate limit is exceeded Boolean limitExceeded = rateLimitService.get() - .isLimitExceeded(apiKey, events, bucket, limitConfig.limit(), limitConfig.durationInSeconds()) + .isLimitExceeded(apiKey, events, bucket, limitConfig) .block(); if (Boolean.TRUE.equals(limitExceeded)) { - setLimitHeaders(apiKey, bucket); + setLimitHeaders(apiKey, bucket, limitConfig); throw new ClientErrorException("Too Many Requests", HttpStatus.SC_TOO_MANY_REQUESTS); } } - private void setLimitHeaders(String apiKey, String bucket) { + private void setLimitHeaders(String apiKey, String bucket, LimitConfig limitConfig) { requestContext.get().getHeaders().put(RequestContext.USER_LIMIT, List.of(bucket)); requestContext.get().getHeaders().put(RequestContext.USER_LIMIT_REMAINING_TTL, - List.of("" + rateLimitService.get().getRemainingTTL(apiKey, bucket).block())); + List.of("" + rateLimitService.get().getRemainingTTL(apiKey, bucket, limitConfig).block())); requestContext.get().getHeaders().put(RequestContext.USER_REMAINING_LIMIT, - List.of("" + rateLimitService.get().availableEvents(apiKey, bucket).block())); + List.of("" + rateLimitService.get().availableEvents(apiKey, bucket, limitConfig).block())); } private Object getParameters(MethodInvocation method) { diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitService.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitService.java index 68940bcae8..1027551ee4 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ratelimit/RateLimitService.java @@ -2,12 +2,13 @@ import reactor.core.publisher.Mono; +import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig; + public interface RateLimitService { - Mono isLimitExceeded(String apiKey, long events, String bucketName, long limit, - long limitDurationInSeconds); + Mono isLimitExceeded(String apiKey, long events, String bucketName, LimitConfig limitConfig); - Mono availableEvents(String apiKey, String bucketName); + Mono availableEvents(String apiKey, String bucketName, LimitConfig limitConfig); - Mono getRemainingTTL(String apiKey, String bucket); + Mono getRemainingTTL(String apiKey, String bucket, LimitConfig limitConfig); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java index 5d78439909..83a27fce02 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java @@ -11,6 +11,8 @@ import java.time.Duration; +import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig; + @RequiredArgsConstructor public class RedisRateLimitService implements RateLimitService { @@ -19,27 +21,33 @@ public class RedisRateLimitService implements RateLimitService { private final RedissonReactiveClient redisClient; @Override - public Mono isLimitExceeded(String apiKey, long events, String bucketName, long limit, - long limitDurationInSeconds) { + public Mono isLimitExceeded(@NonNull String apiKey, long events, @NonNull String bucketName, @NonNull LimitConfig limitConfig) { RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey)); - return rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds, RateIntervalUnit.SECONDS) - .then(Mono.defer(() -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds)))) + return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit) .then(Mono.defer(() -> rateLimit.tryAcquire(events))) .map(Boolean.FALSE::equals); } + private Mono setLimitIfNecessary(long limit, long limitDurationInSeconds, RRateLimiterReactive rateLimit) { + return rateLimit.isExists() + .flatMap(exists -> Boolean.TRUE.equals(exists) ? Mono.empty() : rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds, RateIntervalUnit.SECONDS)) + .then(Mono.defer(() -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds)))); + } + @Override - public Mono availableEvents(@NonNull String apiKey, @NonNull String bucketName) { + public Mono availableEvents(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) { RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey)); - return rateLimit.availablePermits(); + return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit) + .then(Mono.defer(rateLimit::availablePermits)); } @Override - public Mono getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName) { + public Mono getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) { RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey)); - return rateLimit.remainTimeToLive(); + return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit) + .then(Mono.defer(rateLimit::remainTimeToLive)); } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/ratelimit/RateLimitE2ETest.java b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/ratelimit/RateLimitE2ETest.java index b889d7b2c8..512fcb757d 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/ratelimit/RateLimitE2ETest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/ratelimit/RateLimitE2ETest.java @@ -581,6 +581,18 @@ void rateLimit__whenCustomRatedBeanMethodIsCalled__thenRateLimitIsApplied() { } + @Test + @DisplayName("Rate limit: When rate limit is not set, Then set and and return limit") + void rateLimit__whenCustomRatedBeanMethodIsCalled__thenRateLimitIsApplied(RateLimitService rateLimitService) { + String apiKey = UUID.randomUUID().toString(); + int limit = 100; + + Long availableEvents = rateLimitService.availableEvents(apiKey, "generalLimit", new LimitConfig(limit, 1)) + .block(); + + assertEquals(limit, availableEvents); + } + private static void assertLimitHeaders(Response response, long expected, String limitBucket, int limitDuration) { String remainingLimit = response.getHeaderString(RequestContext.USER_REMAINING_LIMIT); String userLimit = response.getHeaderString(RequestContext.USER_LIMIT);