Skip to content

Commit

Permalink
Merge branch 'main' into thiagohora/add_logs
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora authored Sep 13, 2024
2 parents de49b84 + 501aacd commit 712d1a3
Show file tree
Hide file tree
Showing 22 changed files with 970 additions and 300 deletions.
12 changes: 12 additions & 0 deletions apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonView;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import java.util.List;

public record SpanBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Span.View.Write.class}) @Valid List<Span> spans){
}
12 changes: 12 additions & 0 deletions apps/opik-backend/src/main/java/com/comet/opik/api/TraceBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonView;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import java.util.List;

public record TraceBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Trace.View.Write.class}) @Valid List<Trace> traces){
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatch;
import com.comet.opik.api.Span;
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.SpanSearchCriteria;
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.filter.FiltersFactory;
Expand All @@ -27,6 +28,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.ClientErrorException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
Expand All @@ -47,6 +49,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.stream.Collectors;

import static com.comet.opik.api.Span.SpanPage;
import static com.comet.opik.api.Span.View;
Expand Down Expand Up @@ -142,6 +145,30 @@ public Response create(
return Response.created(uri).build();
}

@POST
@Path("/batch")
@Operation(operationId = "createSpans", summary = "Create spans", description = "Create spans", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
public Response createSpans(
@RequestBody(content = @Content(schema = @Schema(implementation = SpanBatch.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBatch spans) {

spans.spans()
.stream()
.filter(span -> span.id() != null) // Filter out spans with null IDs
.collect(Collectors.groupingBy(Span::id))
.forEach((id, spanGroup) -> {
if (spanGroup.size() > 1) {
throw new ClientErrorException("Duplicate span id '%s'".formatted(id), 422);
}
});

spanService.create(spans)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

return Response.noContent().build();
}

@PATCH
@Path("{id}")
@Operation(operationId = "updateSpan", summary = "Update span by id", description = "Update span by id", responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.comet.opik.api.FeedbackScoreBatch;
import com.comet.opik.api.Trace;
import com.comet.opik.api.Trace.TracePage;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.filter.FiltersFactory;
Expand All @@ -26,6 +27,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.ClientErrorException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
Expand All @@ -46,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.stream.Collectors;

import static com.comet.opik.utils.AsyncUtils.setRequestContext;
import static com.comet.opik.utils.ValidationUtils.validateProjectNameAndProjectId;
Expand Down Expand Up @@ -142,6 +145,30 @@ public Response create(
return Response.created(uri).build();
}

@POST
@Path("/batch")
@Operation(operationId = "createTraces", summary = "Create traces", description = "Create traces", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
public Response createSpans(
@RequestBody(content = @Content(schema = @Schema(implementation = TraceBatch.class))) @JsonView(Trace.View.Write.class) @NotNull @Valid TraceBatch traces) {

traces.traces()
.stream()
.filter(trace -> trace.id() != null) // Filter out spans with null IDs
.collect(Collectors.groupingBy(Trace::id))
.forEach((id, traceGroup) -> {
if (traceGroup.size() > 1) {
throw new ClientErrorException("Duplicate trace id '%s'".formatted(id), 422);
}
});

service.create(traces)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

return Response.noContent().build();
}

@PATCH
@Path("{id}")
@Operation(operationId = "updateTrace", summary = "Update trace by id", description = "Update trace by id", responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

@ImplementedBy(FeedbackScoreServiceImpl.class)
public interface FeedbackScoreService {
Flux<FeedbackScore> getScores(EntityType entityType, UUID entityId);

Mono<Void> scoreTrace(UUID traceId, FeedbackScore score);
Mono<Void> scoreSpan(UUID spanId, FeedbackScore score);
Expand Down Expand Up @@ -66,12 +65,6 @@ class FeedbackScoreServiceImpl implements FeedbackScoreService {
record ProjectDto(Project project, List<FeedbackScoreBatchItem> scores) {
}

@Override
public Flux<FeedbackScore> getScores(@NonNull EntityType entityType, @NonNull UUID entityId) {
return asyncTemplate.nonTransaction(connection -> dao.getScores(entityType, List.of(entityId), connection))
.flatMapIterable(entityIdToFeedbackScoresMap -> entityIdToFeedbackScoresMap.get(entityId));
}

@Override
public Mono<Void> scoreTrace(@NonNull UUID traceId, @NonNull FeedbackScore score) {
return lockService.executeWithLock(
Expand Down
120 changes: 119 additions & 1 deletion apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.comet.opik.domain.filter.FilterQueryBuilder;
import com.comet.opik.domain.filter.FilterStrategy;
import com.comet.opik.utils.JsonUtils;
import com.comet.opik.utils.TemplateUtils;
import com.google.common.base.Preconditions;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.Trace;
import io.r2dbc.spi.Connection;
Expand Down Expand Up @@ -36,15 +38,60 @@
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono;
import static com.comet.opik.domain.FeedbackScoreDAO.EntityType;
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment;
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment;
import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware;
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;
import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder;

@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Slf4j
class SpanDAO {

private static final String BULK_INSERT = """
INSERT INTO spans(
id,
project_id,
workspace_id,
trace_id,
parent_span_id,
name,
type,
start_time,
end_time,
input,
output,
metadata,
tags,
usage,
created_by,
last_updated_by
) VALUES
<items:{item |
(
:id<item.index>,
:project_id<item.index>,
:workspace_id,
:trace_id<item.index>,
:parent_span_id<item.index>,
:name<item.index>,
:type<item.index>,
parseDateTime64BestEffort(:start_time<item.index>, 9),
if(:end_time<item.index> IS NULL, NULL, parseDateTime64BestEffort(:end_time<item.index>, 9)),
:input<item.index>,
:output<item.index>,
:metadata<item.index>,
:tags<item.index>,
mapFromArrays(:usage_keys<item.index>, :usage_values<item.index>),
:created_by<item.index>,
:last_updated_by<item.index>
)
<if(item.hasNext)>,<endif>
}>
;
""";

/**
* This query handles the insertion of a new span into the database in two cases:
* 1. When the span does not exist in the database.
Expand Down Expand Up @@ -444,6 +491,78 @@ public Mono<Void> insert(@NonNull Span span) {
.then();
}

@Trace(dispatcher = true)
public Mono<Long> batchInsert(@NonNull List<Span> spans) {

Preconditions.checkArgument(!spans.isEmpty(), "Spans list must not be empty");

return Mono.from(connectionFactory.create())
.flatMapMany(connection -> insert(spans, connection))
.flatMap(Result::getRowsUpdated)
.reduce(0L, Long::sum);
}

private Publisher<? extends Result> insert(List<Span> spans, Connection connection) {

return makeMonoContextAware((userName, workspaceName, workspaceId) -> {
List<TemplateUtils.QueryItem> queryItems = getQueryItemPlaceHolder(spans.size());

var template = new ST(BULK_INSERT)
.add("items", queryItems);

Statement statement = connection.createStatement(template.render());

int i = 0;
for (Span span : spans) {

statement.bind("id" + i, span.id())
.bind("project_id" + i, span.projectId())
.bind("trace_id" + i, span.traceId())
.bind("name" + i, span.name())
.bind("type" + i, span.type().toString())
.bind("start_time" + i, span.startTime().toString())
.bind("parent_span_id" + i, span.parentSpanId() != null ? span.parentSpanId() : "")
.bind("input" + i, span.input() != null ? span.input().toString() : "")
.bind("output" + i, span.output() != null ? span.output().toString() : "")
.bind("metadata" + i, span.metadata() != null ? span.metadata().toString() : "")
.bind("tags" + i, span.tags() != null ? span.tags().toArray(String[]::new) : new String[]{})
.bind("created_by" + i, userName)
.bind("last_updated_by" + i, userName);

if (span.endTime() != null) {
statement.bind("end_time" + i, span.endTime().toString());
} else {
statement.bindNull("end_time" + i, String.class);
}

if (span.usage() != null) {
Stream.Builder<String> keys = Stream.builder();
Stream.Builder<Integer> values = Stream.builder();

span.usage().forEach((key, value) -> {
keys.add(key);
values.add(value);
});

statement.bind("usage_keys" + i, keys.build().toArray(String[]::new));
statement.bind("usage_values" + i, values.build().toArray(Integer[]::new));
} else {
statement.bind("usage_keys" + i, new String[]{});
statement.bind("usage_values" + i, new Integer[]{});
}

i++;
}

statement.bind("workspace_id", workspaceId);

Segment segment = startSegment("spans", "Clickhouse", "batch_insert");

return Mono.from(statement.execute())
.doFinally(signalType -> endSegment(segment));
});
}

private Publisher<? extends Result> insert(Span span, Connection connection) {
var template = newInsertTemplate(span);
var statement = connection.createStatement(template.render())
Expand Down Expand Up @@ -788,5 +907,4 @@ public Mono<List<WorkspaceAndResourceId>> getSpanWorkspace(@NonNull Set<UUID> sp
row.get("id", UUID.class))))
.collectList();
}

}
Loading

0 comments on commit 712d1a3

Please sign in to comment.