Skip to content

Commit

Permalink
Merge branch 'main' into thiagohora/use_opentelemetry_instead_of_new_…
Browse files Browse the repository at this point in the history
…relic
  • Loading branch information
thiagohora authored Sep 17, 2024
2 parents 1363001 + e715f5c commit 6dc523c
Show file tree
Hide file tree
Showing 64 changed files with 1,989 additions and 1,139 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ For more information about the different deployment options, please see our depl

| Installation methods | Docs link |
| ------------------- | --------- |
| Local instance | [![All-in-one isntallation](https://img.shields.io/badge/All--in--one%20Installer-%230db7ed)](https://www.comet.com/docs/opik/self-host/self_hosting_opik/#all-in-one-installation?utm_source=opik&utm_medium=github&utm_content=self_host_link)
| Kubernetes | [![Kubernetes](https://img.shields.io/badge/kubernetes-%23326ce5.svg?&logo=kubernetes&logoColor=white)](https://www.comet.com/docs/opik/self-host/self_hosting_opik/#kubernetes-installation?utm_source=opik&utm_medium=github&utm_content=kubernetes_link)
| Local instance | [![Local Deployment](https://img.shields.io/badge/Local%20Deployments-%232496ED?style=flat&logo=docker&logoColor=white)](https://www.comet.com/docs/opik/self-host/local_deployment?utm_source=opik&utm_medium=github&utm_content=self_host_link)
| Kubernetes | [![Kubernetes](https://img.shields.io/badge/Kubernetes-%23326ce5.svg?&logo=kubernetes&logoColor=white)](https://www.comet.com/docs/opik/self-host/kubernetes/#kubernetes-installation?utm_source=opik&utm_medium=github&utm_content=kubernetes_link)


## 🏁 Get Started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
public record SpanBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Span.View.Write.class}) @Valid List<Span> spans){
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
public record TraceBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Trace.View.Write.class}) @Valid List<Trace> traces){
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record TracesDelete(@NotNull @Size(min = 1, max = 1000) Set<UUID> ids) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.TracesDelete;
import com.comet.opik.api.filter.FiltersFactory;
import com.comet.opik.api.filter.TraceFilter;
import com.comet.opik.domain.FeedbackScoreService;
Expand Down Expand Up @@ -60,7 +61,7 @@
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @jakarta.inject.Inject)
@Tag(name = "Traces", description = "Trace related resources")
public class TraceResource {
public class TracesResource {

private final @NonNull TraceService service;
private final @NonNull FeedbackScoreService feedbackScoreService;
Expand Down Expand Up @@ -206,6 +207,20 @@ public Response deleteById(@PathParam("id") UUID id) {
return Response.noContent().build();
}

@POST
@Path("/delete")
@Operation(operationId = "deleteTraces", summary = "Delete traces", description = "Delete traces", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
public Response deleteTraces(
@RequestBody(content = @Content(schema = @Schema(implementation = TracesDelete.class))) @NotNull @Valid TracesDelete request) {
log.info("Deleting traces, count '{}'", request.ids().size());
service.delete(request.ids())
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();
log.info("Deleted traces, count '{}'", request.ids().size());
return Response.noContent().build();
}

@PUT
@Path("/{id}/feedback-scores")
@Operation(operationId = "addTraceFeedbackScore", summary = "Add trace feedback score", description = "Add trace feedback score", responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatchItem;
import com.comet.opik.api.ScoreSource;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
Expand All @@ -24,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -52,14 +55,16 @@ enum EntityType {

Mono<Void> deleteScoreFrom(EntityType entityType, UUID id, String name, Connection connection);

Mono<Void> deleteByEntityId(EntityType entityType, UUID id, Connection connection);
Mono<Void> deleteByEntityId(EntityType entityType, UUID entityId, Connection connection);

Mono<Long> scoreBatchOf(EntityType entityType, List<FeedbackScoreBatchItem> scores, Connection connection);
Mono<Void> deleteByEntityIds(EntityType entityType, Set<UUID> entityIds, Connection connection);

Mono<Long> scoreBatchOf(EntityType entityType, List<FeedbackScoreBatchItem> scores, Connection connection);
}

@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Slf4j
class FeedbackScoreDAOImpl implements FeedbackScoreDAO {

record FeedbackScoreDto(UUID entityId, FeedbackScore score) {
Expand Down Expand Up @@ -230,21 +235,21 @@ LEFT JOIN (
;
""";

private static final String DELETE_SPAN_CASCADE_FEEDBACK_SCORE = """
private static final String DELETE_SPANS_CASCADE_FEEDBACK_SCORE = """
DELETE FROM feedback_scores
WHERE entity_type = 'span'
AND entity_id IN (
SELECT id
FROM spans
WHERE trace_id = :trace_id
WHERE trace_id IN :trace_ids
)
AND workspace_id = :workspace_id
;
""";

private static final String DELETE_FEEDBACK_SCORE_BY_ENTITY_ID = """
private static final String DELETE_FEEDBACK_SCORE_BY_ENTITY_IDS = """
DELETE FROM feedback_scores
WHERE entity_id = :entity_id
WHERE entity_id IN :entity_ids
AND entity_type = :entity_type
AND workspace_id = :workspace_id
;
Expand Down Expand Up @@ -410,30 +415,39 @@ public Mono<Void> deleteScoreFrom(EntityType entityType, UUID id, String name, C
}

@Override
public Mono<Void> deleteByEntityId(@NonNull EntityType entityType, @NonNull UUID id,
@NonNull Connection connection) {
public Mono<Void> deleteByEntityId(
@NonNull EntityType entityType, @NonNull UUID entityId, @NonNull Connection connection) {
return deleteByEntityIds(entityType, Set.of(entityId), connection);
}

@Override
public Mono<Void> deleteByEntityIds(
@NonNull EntityType entityType, Set<UUID> entityIds, @NonNull Connection connection) {
Preconditions.checkArgument(
CollectionUtils.isNotEmpty(entityIds), "Argument 'entityIds' must not be empty");
log.info("Deleting feedback scores for entityType '{}', entityIds count '{}'", entityType, entityIds.size());
return switch (entityType) {
case TRACE -> cascadeSpanDelete(id, connection)
case TRACE -> cascadeSpanDelete(entityIds, connection)
.flatMap(result -> Mono.from(result.getRowsUpdated()))
.then(Mono.defer(() -> deleteScoresByEntityId(entityType, id, connection)))
.then(Mono.defer(() -> deleteScoresByEntityIds(entityType, entityIds, connection)))
.then();
case SPAN -> deleteScoresByEntityId(entityType, id, connection)
case SPAN -> deleteScoresByEntityIds(entityType, entityIds, connection)
.then();
};
}

private Mono<? extends Result> cascadeSpanDelete(UUID id, Connection connection) {
var statement = connection.createStatement(DELETE_SPAN_CASCADE_FEEDBACK_SCORE)
.bind("trace_id", id);

private Mono<? extends Result> cascadeSpanDelete(Set<UUID> traceIds, Connection connection) {
log.info("Deleting feedback scores by span entityId, traceIds count '{}'", traceIds.size());
var statement = connection.createStatement(DELETE_SPANS_CASCADE_FEEDBACK_SCORE)
.bind("trace_ids", traceIds.toArray(UUID[]::new));
return makeMonoContextAware(bindWorkspaceIdToMono(statement));
}

private Mono<Long> deleteScoresByEntityId(EntityType entityType, UUID id, Connection connection) {
Statement statement = connection.createStatement(DELETE_FEEDBACK_SCORE_BY_ENTITY_ID)
.bind("entity_id", id)
private Mono<Long> deleteScoresByEntityIds(EntityType entityType, Set<UUID> entityIds, Connection connection) {
log.info("Deleting feedback scores by entityType '{}', entityIds count '{}'", entityType, entityIds.size());
var statement = connection.createStatement(DELETE_FEEDBACK_SCORE_BY_ENTITY_IDS)
.bind("entity_ids", entityIds.toArray(UUID[]::new))
.bind("entity_type", entityType.getType());

return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.flatMap(result -> Mono.from(result.getRowsUpdated()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatchItem;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;

import java.util.UUID;

@Mapper
public interface FeedbackScoreMapper {

FeedbackScoreMapper INSTANCE = Mappers.getMapper(FeedbackScoreMapper.class);

FeedbackScore toFeedbackScore(FeedbackScoreBatchItem feedbackScoreBatchItem);

@Mapping(target = "id", source = "entityId")
FeedbackScoreBatchItem toFeedbackScoreBatchItem(UUID entityId, String projectName, FeedbackScore feedbackScore);
}
29 changes: 17 additions & 12 deletions apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.reactivestreams.Publisher;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -96,7 +97,7 @@ INSERT INTO spans(
* This query handles the insertion of a new span into the database in two cases:
* 1. When the span does not exist in the database.
* 2. When the span exists in the database but the provided span has different values for the fields such as end_time, input, output, metadata and tags.
* **/
**/
//TODO: refactor to implement proper conflict resolution
private static final String INSERT = """
INSERT INTO spans(
Expand Down Expand Up @@ -268,14 +269,13 @@ INSERT INTO spans (

/**
* This query is used when updates are processed before inserts, and the span does not exist in the database.
*
* <p>
* The query will insert/update a new span with the provided values such as end_time, input, output, metadata, tags etc.
* In case the values are not provided, the query will use the default values such value are interpreted in other queries as null.
*
* <p>
* This happens because the query is used in a patch endpoint which allows partial updates, so the query will update only the provided fields.
* The remaining fields will be updated/inserted once the POST arrives with the all mandatory fields to create the trace.
*
* */
*/
//TODO: refactor to implement proper conflict resolution
private static final String PARTIAL_INSERT = """
INSERT INTO spans(
Expand Down Expand Up @@ -466,8 +466,8 @@ AND id in (
;
""";

private static final String DELETE_BY_TRACE_ID = """
DELETE FROM spans WHERE trace_id = :trace_id AND workspace_id = :workspace_id;
private static final String DELETE_BY_TRACE_IDS = """
DELETE FROM spans WHERE trace_id IN :trace_ids AND workspace_id = :workspace_id;
""";

private static final String SELECT_SPAN_ID_AND_WORKSPACE = """
Expand Down Expand Up @@ -745,15 +745,20 @@ private Publisher<? extends Result> getById(UUID id, Connection connection) {

@WithSpan
public Mono<Void> deleteByTraceId(@NonNull UUID traceId, @NonNull Connection connection) {
Statement statement = connection.createStatement(DELETE_BY_TRACE_ID)
.bind("trace_id", traceId);

Segment segment = startSegment("spans", "Clickhouse", "delete_by_trace_id");
return deleteByTraceIds(Set.of(traceId), connection);
}

@Trace(dispatcher = true)
public Mono<Void> deleteByTraceIds(Set<UUID> traceIds, @NonNull Connection connection) {
Preconditions.checkArgument(
CollectionUtils.isNotEmpty(traceIds), "Argument 'traceIds' must not be empty");
log.info("Deleting spans by traceIds, count '{}'", traceIds.size());
var statement = connection.createStatement(DELETE_BY_TRACE_IDS)
.bind("trace_ids", traceIds);
var segment = startSegment("spans", "Clickhouse", "delete_by_trace_id");
return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.doFinally(signalType -> endSegment(segment))
.then();

}

private Publisher<Span> mapToDto(Result result) {
Expand Down
Loading

0 comments on commit 6dc523c

Please sign in to comment.