Skip to content

Commit

Permalink
[OPIK-144] Fix condition when limit expires and application tries to …
Browse files Browse the repository at this point in the history
…retrieve to check available permits
  • Loading branch information
thiagohora committed Oct 2, 2024
1 parent 0312b7e commit d82b307
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,29 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
try {
return invocation.proceed();
} finally {
setLimitHeaders(apiKey, limitBucket);
setLimitHeaders(apiKey, limitBucket, generalLimit);
}
}

private void verifyRateLimit(long events, String apiKey, String bucket, LimitConfig limitConfig) {

// Check if the rate limit is exceeded
Boolean limitExceeded = rateLimitService.get()
.isLimitExceeded(apiKey, events, bucket, limitConfig.limit(), limitConfig.durationInSeconds())
.isLimitExceeded(apiKey, events, bucket, limitConfig)
.block();

if (Boolean.TRUE.equals(limitExceeded)) {
setLimitHeaders(apiKey, bucket);
setLimitHeaders(apiKey, bucket, limitConfig);
throw new ClientErrorException("Too Many Requests", HttpStatus.SC_TOO_MANY_REQUESTS);
}
}

private void setLimitHeaders(String apiKey, String bucket) {
private void setLimitHeaders(String apiKey, String bucket, LimitConfig limitConfig) {
requestContext.get().getHeaders().put(RequestContext.USER_LIMIT, List.of(bucket));
requestContext.get().getHeaders().put(RequestContext.USER_LIMIT_REMAINING_TTL,
List.of("" + rateLimitService.get().getRemainingTTL(apiKey, bucket).block()));
List.of("" + rateLimitService.get().getRemainingTTL(apiKey, bucket, limitConfig).block()));
requestContext.get().getHeaders().put(RequestContext.USER_REMAINING_LIMIT,
List.of("" + rateLimitService.get().availableEvents(apiKey, bucket).block()));
List.of("" + rateLimitService.get().availableEvents(apiKey, bucket, limitConfig).block()));
}

private Object getParameters(MethodInvocation method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import reactor.core.publisher.Mono;

import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig;

public interface RateLimitService {

Mono<Boolean> isLimitExceeded(String apiKey, long events, String bucketName, long limit,
long limitDurationInSeconds);
Mono<Boolean> isLimitExceeded(String apiKey, long events, String bucketName, LimitConfig limitConfig);

Mono<Long> availableEvents(String apiKey, String bucketName);
Mono<Long> availableEvents(String apiKey, String bucketName, LimitConfig limitConfig);

Mono<Long> getRemainingTTL(String apiKey, String bucket);
Mono<Long> getRemainingTTL(String apiKey, String bucket, LimitConfig limitConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import java.time.Duration;

import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig;

@RequiredArgsConstructor
public class RedisRateLimitService implements RateLimitService {

Expand All @@ -19,27 +21,33 @@ public class RedisRateLimitService implements RateLimitService {
private final RedissonReactiveClient redisClient;

@Override
public Mono<Boolean> isLimitExceeded(String apiKey, long events, String bucketName, long limit,
long limitDurationInSeconds) {
public Mono<Boolean> isLimitExceeded(@NonNull String apiKey, long events, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {

RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));

return rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds, RateIntervalUnit.SECONDS)
.then(Mono.defer(() -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds))))
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(() -> rateLimit.tryAcquire(events)))
.map(Boolean.FALSE::equals);
}

private Mono<Boolean> setLimitIfNecessary(long limit, long limitDurationInSeconds, RRateLimiterReactive rateLimit) {
return rateLimit.isExists()
.flatMap(exists -> Boolean.TRUE.equals(exists) ? Mono.empty() : rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds, RateIntervalUnit.SECONDS))
.then(Mono.defer(() -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds))));
}

@Override
public Mono<Long> availableEvents(@NonNull String apiKey, @NonNull String bucketName) {
public Mono<Long> availableEvents(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return rateLimit.availablePermits();
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::availablePermits));
}

@Override
public Mono<Long> getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName) {
public Mono<Long> getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return rateLimit.remainTimeToLive();
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::remainTimeToLive));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,18 @@ void rateLimit__whenCustomRatedBeanMethodIsCalled__thenRateLimitIsApplied() {

}

@Test
@DisplayName("Rate limit: When rate limit is not set, Then set and and return limit")
void rateLimit__whenCustomRatedBeanMethodIsCalled__thenRateLimitIsApplied(RateLimitService rateLimitService) {
String apiKey = UUID.randomUUID().toString();
int limit = 100;

Long availableEvents = rateLimitService.availableEvents(apiKey, "generalLimit", new LimitConfig(limit, 1))
.block();

assertEquals(limit, availableEvents);
}

private static void assertLimitHeaders(Response response, long expected, String limitBucket, int limitDuration) {
String remainingLimit = response.getHeaderString(RequestContext.USER_REMAINING_LIMIT);
String userLimit = response.getHeaderString(RequestContext.USER_LIMIT);
Expand Down

0 comments on commit d82b307

Please sign in to comment.