Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPIK-72 Add batch remove traces endpoint #256

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
public record SpanBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Span.View.Write.class}) @Valid List<Span> spans){
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
public record TraceBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Trace.View.Write.class}) @Valid List<Trace> traces){
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record TracesDelete(@NotNull @Size(min = 1, max = 1000) Set<UUID> ids) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.TracesDelete;
import com.comet.opik.api.filter.FiltersFactory;
import com.comet.opik.api.filter.TraceFilter;
import com.comet.opik.domain.FeedbackScoreService;
Expand Down Expand Up @@ -60,7 +61,7 @@
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @jakarta.inject.Inject)
@Tag(name = "Traces", description = "Trace related resources")
public class TraceResource {
public class TracesResource {

private final @NonNull TraceService service;
private final @NonNull FeedbackScoreService feedbackScoreService;
Expand Down Expand Up @@ -206,6 +207,20 @@ public Response deleteById(@PathParam("id") UUID id) {
return Response.noContent().build();
}

@POST
@Path("/delete")
@Operation(operationId = "deleteTraces", summary = "Delete traces", description = "Delete traces", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
public Response deleteTraces(
@RequestBody(content = @Content(schema = @Schema(implementation = TracesDelete.class))) @NotNull @Valid TracesDelete request) {
log.info("Deleting traces, count '{}'", request.ids().size());
service.delete(request.ids())
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();
log.info("Deleted traces, count '{}'", request.ids().size());
return Response.noContent().build();
}

@PUT
@Path("/{id}/feedback-scores")
@Operation(operationId = "addTraceFeedbackScore", summary = "Add trace feedback score", description = "Add trace feedback score", responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatchItem;
import com.comet.opik.api.ScoreSource;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
Expand All @@ -24,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -52,14 +55,16 @@ enum EntityType {

Mono<Void> deleteScoreFrom(EntityType entityType, UUID id, String name, Connection connection);

Mono<Void> deleteByEntityId(EntityType entityType, UUID id, Connection connection);
Mono<Void> deleteByEntityId(EntityType entityType, UUID entityId, Connection connection);

Mono<Long> scoreBatchOf(EntityType entityType, List<FeedbackScoreBatchItem> scores, Connection connection);
Mono<Void> deleteByEntityIds(EntityType entityType, Set<UUID> entityIds, Connection connection);

Mono<Long> scoreBatchOf(EntityType entityType, List<FeedbackScoreBatchItem> scores, Connection connection);
}

@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Slf4j
class FeedbackScoreDAOImpl implements FeedbackScoreDAO {

record FeedbackScoreDto(UUID entityId, FeedbackScore score) {
Expand Down Expand Up @@ -230,21 +235,21 @@ LEFT JOIN (
;
""";

private static final String DELETE_SPAN_CASCADE_FEEDBACK_SCORE = """
private static final String DELETE_SPANS_CASCADE_FEEDBACK_SCORE = """
DELETE FROM feedback_scores
WHERE entity_type = 'span'
AND entity_id IN (
SELECT id
FROM spans
WHERE trace_id = :trace_id
WHERE trace_id IN :trace_ids
)
AND workspace_id = :workspace_id
;
""";

private static final String DELETE_FEEDBACK_SCORE_BY_ENTITY_ID = """
private static final String DELETE_FEEDBACK_SCORE_BY_ENTITY_IDS = """
DELETE FROM feedback_scores
WHERE entity_id = :entity_id
WHERE entity_id IN :entity_ids
AND entity_type = :entity_type
AND workspace_id = :workspace_id
;
Expand Down Expand Up @@ -410,30 +415,39 @@ public Mono<Void> deleteScoreFrom(EntityType entityType, UUID id, String name, C
}

@Override
public Mono<Void> deleteByEntityId(@NonNull EntityType entityType, @NonNull UUID id,
@NonNull Connection connection) {
public Mono<Void> deleteByEntityId(
@NonNull EntityType entityType, @NonNull UUID entityId, @NonNull Connection connection) {
return deleteByEntityIds(entityType, Set.of(entityId), connection);
}

@Override
public Mono<Void> deleteByEntityIds(
@NonNull EntityType entityType, Set<UUID> entityIds, @NonNull Connection connection) {
Preconditions.checkArgument(
CollectionUtils.isNotEmpty(entityIds), "Argument 'entityIds' must not be empty");
log.info("Deleting feedback scores for entityType '{}', entityIds count '{}'", entityType, entityIds.size());
return switch (entityType) {
case TRACE -> cascadeSpanDelete(id, connection)
case TRACE -> cascadeSpanDelete(entityIds, connection)
.flatMap(result -> Mono.from(result.getRowsUpdated()))
.then(Mono.defer(() -> deleteScoresByEntityId(entityType, id, connection)))
.then(Mono.defer(() -> deleteScoresByEntityIds(entityType, entityIds, connection)))
.then();
case SPAN -> deleteScoresByEntityId(entityType, id, connection)
case SPAN -> deleteScoresByEntityIds(entityType, entityIds, connection)
.then();
};
}

private Mono<? extends Result> cascadeSpanDelete(UUID id, Connection connection) {
var statement = connection.createStatement(DELETE_SPAN_CASCADE_FEEDBACK_SCORE)
.bind("trace_id", id);

private Mono<? extends Result> cascadeSpanDelete(Set<UUID> traceIds, Connection connection) {
log.info("Deleting feedback scores by span entityId, traceIds count '{}'", traceIds.size());
var statement = connection.createStatement(DELETE_SPANS_CASCADE_FEEDBACK_SCORE)
.bind("trace_ids", traceIds.toArray(UUID[]::new));
return makeMonoContextAware(bindWorkspaceIdToMono(statement));
}

private Mono<Long> deleteScoresByEntityId(EntityType entityType, UUID id, Connection connection) {
Statement statement = connection.createStatement(DELETE_FEEDBACK_SCORE_BY_ENTITY_ID)
.bind("entity_id", id)
private Mono<Long> deleteScoresByEntityIds(EntityType entityType, Set<UUID> entityIds, Connection connection) {
log.info("Deleting feedback scores by entityType '{}', entityIds count '{}'", entityType, entityIds.size());
var statement = connection.createStatement(DELETE_FEEDBACK_SCORE_BY_ENTITY_IDS)
.bind("entity_ids", entityIds.toArray(UUID[]::new))
.bind("entity_type", entityType.getType());

return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.flatMap(result -> Mono.from(result.getRowsUpdated()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatchItem;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;

import java.util.UUID;

@Mapper
public interface FeedbackScoreMapper {

FeedbackScoreMapper INSTANCE = Mappers.getMapper(FeedbackScoreMapper.class);

FeedbackScore toFeedbackScore(FeedbackScoreBatchItem feedbackScoreBatchItem);

@Mapping(target = "id", source = "entityId")
FeedbackScoreBatchItem toFeedbackScoreBatchItem(UUID entityId, String projectName, FeedbackScore feedbackScore);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.reactivestreams.Publisher;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -96,7 +97,7 @@ INSERT INTO spans(
* This query handles the insertion of a new span into the database in two cases:
* 1. When the span does not exist in the database.
* 2. When the span exists in the database but the provided span has different values for the fields such as end_time, input, output, metadata and tags.
* **/
**/
//TODO: refactor to implement proper conflict resolution
private static final String INSERT = """
INSERT INTO spans(
Expand Down Expand Up @@ -268,14 +269,13 @@ INSERT INTO spans (

/**
* This query is used when updates are processed before inserts, and the span does not exist in the database.
*
* <p>
* The query will insert/update a new span with the provided values such as end_time, input, output, metadata, tags etc.
* In case the values are not provided, the query will use the default values such value are interpreted in other queries as null.
*
* <p>
* This happens because the query is used in a patch endpoint which allows partial updates, so the query will update only the provided fields.
* The remaining fields will be updated/inserted once the POST arrives with the all mandatory fields to create the trace.
*
* */
*/
//TODO: refactor to implement proper conflict resolution
private static final String PARTIAL_INSERT = """
INSERT INTO spans(
Expand Down Expand Up @@ -466,8 +466,8 @@ AND id in (
;
""";

private static final String DELETE_BY_TRACE_ID = """
DELETE FROM spans WHERE trace_id = :trace_id AND workspace_id = :workspace_id;
private static final String DELETE_BY_TRACE_IDS = """
DELETE FROM spans WHERE trace_id IN :trace_ids AND workspace_id = :workspace_id;
""";

private static final String SELECT_SPAN_ID_AND_WORKSPACE = """
Expand Down Expand Up @@ -745,15 +745,20 @@ private Publisher<? extends Result> getById(UUID id, Connection connection) {

@Trace(dispatcher = true)
public Mono<Void> deleteByTraceId(@NonNull UUID traceId, @NonNull Connection connection) {
Statement statement = connection.createStatement(DELETE_BY_TRACE_ID)
.bind("trace_id", traceId);

Segment segment = startSegment("spans", "Clickhouse", "delete_by_trace_id");
return deleteByTraceIds(Set.of(traceId), connection);
}

@Trace(dispatcher = true)
public Mono<Void> deleteByTraceIds(Set<UUID> traceIds, @NonNull Connection connection) {
Preconditions.checkArgument(
CollectionUtils.isNotEmpty(traceIds), "Argument 'traceIds' must not be empty");
log.info("Deleting spans by traceIds, count '{}'", traceIds.size());
var statement = connection.createStatement(DELETE_BY_TRACE_IDS)
.bind("trace_ids", traceIds);
var segment = startSegment("spans", "Clickhouse", "delete_by_trace_id");
return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.doFinally(signalType -> segment.end())
.then();

}

private Publisher<Span> mapToDto(Result result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.reactivestreams.Publisher;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -52,12 +53,13 @@ interface TraceDAO {

Mono<Void> delete(UUID id, Connection connection);

Mono<Void> delete(Set<UUID> ids, Connection connection);

Mono<Trace> findById(UUID id, Connection connection);

Mono<TracePage> find(int size, int page, TraceSearchCriteria traceSearchCriteria, Connection connection);

Mono<Void> partialInsert(UUID projectId, TraceUpdate traceUpdate, UUID traceId,
Connection connection);
Mono<Void> partialInsert(UUID projectId, TraceUpdate traceUpdate, UUID traceId, Connection connection);

Mono<List<WorkspaceAndResourceId>> getTraceWorkspace(Set<UUID> traceIds, Connection connection);

Expand Down Expand Up @@ -108,7 +110,7 @@ INSERT INTO traces(
* This query handles the insertion of a new trace into the database in two cases:
* 1. When the trace does not exist in the database.
* 2. When the trace exists in the database but the provided trace has different values for the fields such as end_time, input, output, metadata and tags.
* **/
**/
//TODO: refactor to implement proper conflict resolution
private static final String INSERT = """
INSERT INTO traces(
Expand Down Expand Up @@ -307,7 +309,7 @@ AND id in (

private static final String DELETE_BY_ID = """
DELETE FROM traces
WHERE id = :id
WHERE id IN :ids
AND workspace_id = :workspace_id
;
""";
Expand All @@ -323,15 +325,14 @@ AND id in (
""";

/**
* This query is used when updates are processed before inserts, and the trace does not exist in the database.
*
* The query will insert/update a new trace with the provided values such as end_time, input, output, metadata and tags.
* In case the values are not provided, the query will use the default values such value are interpreted in other queries as null.
*
* This happens because the query is used in a patch endpoint which allows partial updates, so the query will update only the provided fields.
* The remaining fields will be updated/inserted once the POST arrives with the all mandatory fields to create the trace.
*
* */
* This query is used when updates are processed before inserts, and the trace does not exist in the database.
* <p>
* The query will insert/update a new trace with the provided values such as end_time, input, output, metadata and tags.
* In case the values are not provided, the query will use the default values such value are interpreted in other queries as null.
* <p>
* This happens because the query is used in a patch endpoint which allows partial updates, so the query will update only the provided fields.
* The remaining fields will be updated/inserted once the POST arrives with the all mandatory fields to create the trace.
*/
//TODO: refactor to implement proper conflict resolution
private static final String INSERT_UPDATE = """
INSERT INTO traces (
Expand Down Expand Up @@ -505,8 +506,7 @@ private Mono<? extends Result> update(UUID id, TraceUpdate traceUpdate, Connecti
.doFinally(signalType -> endSegment(segment));
}

private Statement createUpdateStatement(UUID id, TraceUpdate traceUpdate, Connection connection,
String sql) {
private Statement createUpdateStatement(UUID id, TraceUpdate traceUpdate, Connection connection, String sql) {
Statement statement = connection.createStatement(sql);

bindUpdateParams(traceUpdate, statement);
Expand Down Expand Up @@ -566,11 +566,16 @@ private Flux<? extends Result> getById(UUID id, Connection connection) {
@Override
@com.newrelic.api.agent.Trace(dispatcher = true)
public Mono<Void> delete(@NonNull UUID id, @NonNull Connection connection) {
var statement = connection.createStatement(DELETE_BY_ID)
.bind("id", id);

Segment segment = startSegment("traces", "Clickhouse", "delete");
return delete(Set.of(id), connection);
}

@Override
public Mono<Void> delete(Set<UUID> ids, @NonNull Connection connection) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty");
log.info("Deleting traces, count '{}'", ids.size());
var statement = connection.createStatement(DELETE_BY_ID)
.bind("ids", ids.toArray(UUID[]::new));
var segment = startSegment("traces", "Clickhouse", "delete");
return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.doFinally(signalType -> endSegment(segment))
.then();
Expand Down
Loading