Skip to content

Commit

Permalink
[OPIK-159] Experiment delete endpoint (#336)
Browse files Browse the repository at this point in the history
* [OPIK-159] Experiment delete endpoint

* Add Batch delete

* Add auth tests for experiments delete

* Fix tests
  • Loading branch information
thiagohora authored Oct 4, 2024
1 parent ea6cf3e commit 471cc47
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ExperimentsDelete(@NotNull @Size(min = 1, max = 1000) Set<UUID> ids) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.comet.opik.api.ExperimentItemsBatch;
import com.comet.opik.api.ExperimentItemsDelete;
import com.comet.opik.api.ExperimentSearchCriteria;
import com.comet.opik.api.ExperimentsDelete;
import com.comet.opik.domain.ExperimentItemService;
import com.comet.opik.domain.ExperimentService;
import com.comet.opik.domain.IdGenerator;
Expand Down Expand Up @@ -132,6 +133,21 @@ public Response create(
return Response.created(uri).build();
}

@POST
@Path("/delete")
@Operation(operationId = "deleteExperimentsById", summary = "Delete experiments by id", description = "Delete experiments by id", responses = {
@ApiResponse(responseCode = "204", description = "No content")})
public Response deleteExperimentsById(
@RequestBody(content = @Content(schema = @Schema(implementation = ExperimentsDelete.class))) @NotNull @Valid ExperimentsDelete request) {

log.info("Deleting experiments, count '{}'", request.ids());
experimentService.delete(request.ids())
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();
log.info("Deleted experiments, count '{}'", request.ids());
return Response.noContent().build();
}

// Experiment Item Resources

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -362,6 +363,13 @@ AND ilike(name, CONCAT('%', :name, '%'))
;
""";

private static final String DELETE_BY_IDS = """
DELETE FROM experiments
WHERE id IN :ids
AND workspace_id = :workspace_id
;
""";

private final @NonNull ConnectionFactory connectionFactory;

Mono<Void> insert(@NonNull Experiment experiment) {
Expand Down Expand Up @@ -526,4 +534,29 @@ public Flux<WorkspaceAndResourceId> getExperimentWorkspaces(@NonNull Set<UUID> e
row.get("workspace_id", String.class),
row.get("id", UUID.class))));
}

public Mono<Long> delete(Set<UUID> ids) {

Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty");

log.info("Deleting experiments by ids [{}]", Arrays.toString(ids.toArray()));

return Mono.from(connectionFactory.create())
.flatMapMany(connection -> delete(ids, connection))
.reduce(Long::sum)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
log.info("Deleted experiments by ids [{}]", Arrays.toString(ids.toArray()));
}
});
}

private Publisher<Long> delete(Set<UUID> ids, Connection connection) {

var statement = connection.createStatement(DELETE_BY_IDS)
.bind("ids", ids.toArray(UUID[]::new));

return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
.flatMap(Result::getRowsUpdated);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -117,6 +119,13 @@ INSERT INTO experiment_items (
;
""";

private static final String DELETE_BY_EXPERIMENT_ID = """
DELETE FROM experiment_items
WHERE experiment_id IN :experiment_ids
AND workspace_id = :workspace_id
;
""";

private final @NonNull ConnectionFactory connectionFactory;

public Flux<ExperimentSummary> findExperimentSummaryByDatasetIds(Collection<UUID> datasetIds) {
Expand Down Expand Up @@ -260,4 +269,30 @@ private Publisher<? extends Result> delete(Set<UUID> ids, Connection connection)

return makeFluxContextAware(bindWorkspaceIdToFlux(statement));
}

public Mono<Long> deleteByExperimentIds(Set<UUID> experimentIds) {

Preconditions.checkArgument(CollectionUtils.isNotEmpty(experimentIds),
"Argument 'experimentIds' must not be empty");

log.info("Deleting experiment items by experiment ids [{}]", Arrays.toString(experimentIds.toArray()));

return Mono.from(connectionFactory.create())
.flatMapMany(connection -> deleteByExperimentIds(experimentIds, connection))
.reduce(0L, Long::sum)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
log.info("Deleted experiment items by experiment ids [{}]",
Arrays.toString(experimentIds.toArray()));
}
});
}

private Publisher<Long> deleteByExperimentIds(Set<UUID> ids, Connection connection) {
Statement statement = connection.createStatement(DELETE_BY_EXPERIMENT_ID)
.bind("experiment_ids", ids.toArray(UUID[]::new));

return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
.flatMap(Result::getRowsUpdated);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class ExperimentService {

private final @NonNull ExperimentDAO experimentDAO;
private final @NonNull ExperimentItemDAO experimentItemDAO;
private final @NonNull DatasetService datasetService;
private final @NonNull IdGenerator idGenerator;
private final @NonNull NameGenerator nameGenerator;
Expand Down Expand Up @@ -147,4 +148,10 @@ public Mono<Boolean> validateExperimentWorkspace(@NonNull String workspaceId, @N
return experimentDAO.getExperimentWorkspaces(experimentIds)
.all(experimentWorkspace -> workspaceId.equals(experimentWorkspace.workspaceId()));
}

public Mono<Void> delete(@NonNull Set<UUID> ids) {
return experimentDAO.delete(ids)
.then(Mono.defer(() -> experimentItemDAO.deleteByExperimentIds(ids)))
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class RedisRateLimitService implements RateLimitService {
private final RedissonReactiveClient redisClient;

@Override
public Mono<Boolean> isLimitExceeded(@NonNull String apiKey, long events, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {
public Mono<Boolean> isLimitExceeded(@NonNull String apiKey, long events, @NonNull String bucketName,
@NonNull LimitConfig limitConfig) {

RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));

Expand All @@ -32,19 +33,24 @@ public Mono<Boolean> isLimitExceeded(@NonNull String apiKey, long events, @NonNu

private Mono<Boolean> setLimitIfNecessary(long limit, long limitDurationInSeconds, RRateLimiterReactive rateLimit) {
return rateLimit.isExists()
.flatMap(exists -> Boolean.TRUE.equals(exists) ? Mono.empty() : rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds, RateIntervalUnit.SECONDS))
.flatMap(exists -> Boolean.TRUE.equals(exists)
? Mono.empty()
: rateLimit.trySetRate(RateType.OVERALL, limit, limitDurationInSeconds,
RateIntervalUnit.SECONDS))
.then(Mono.defer(() -> rateLimit.expireIfNotSet(Duration.ofSeconds(limitDurationInSeconds))));
}

@Override
public Mono<Long> availableEvents(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {
public Mono<Long> availableEvents(@NonNull String apiKey, @NonNull String bucketName,
@NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::availablePermits));
}

@Override
public Mono<Long> getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName, @NonNull LimitConfig limitConfig) {
public Mono<Long> getRemainingTTL(@NonNull String apiKey, @NonNull String bucketName,
@NonNull LimitConfig limitConfig) {
RRateLimiterReactive rateLimit = redisClient.getRateLimiter(KEY.formatted(bucketName, apiKey));
return setLimitIfNecessary(limitConfig.limit(), limitConfig.durationInSeconds(), rateLimit)
.then(Mono.defer(rateLimit::remainTimeToLive));
Expand Down
Loading

0 comments on commit 471cc47

Please sign in to comment.