diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java index 6071a17f7c..4bb996413c 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java @@ -15,6 +15,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -65,4 +66,18 @@ public static class Write { public static class Public { } } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Span span = (Span) o; + return Objects.equals(id, span.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java b/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBulk.java similarity index 57% rename from apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java rename to apps/opik-backend/src/main/java/com/comet/opik/api/SpanBulk.java index f422eb7d74..5bf49a8074 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBulk.java @@ -5,8 +5,8 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; -import java.util.List; +import java.util.Set; -public record SpanBatch(@NotNull @Size(min = 1, max = 2000) @JsonView( { - Span.View.Write.class}) @Valid List spans){ +public record SpanBulk(@NotNull @Size(min = 1, max = 1000) @JsonView( { + Span.View.Write.class}) @Valid Set spans){ } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java index a1fa71bb6f..aec5ba585d 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java @@ -5,7 +5,7 @@ import com.comet.opik.api.FeedbackScore; import com.comet.opik.api.FeedbackScoreBatch; import com.comet.opik.api.Span; -import com.comet.opik.api.SpanBatch; +import com.comet.opik.api.SpanBulk; import com.comet.opik.api.SpanSearchCriteria; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.filter.FiltersFactory; @@ -134,11 +134,11 @@ public Response create( } @POST - @Path("/batch") + @Path("/bulk") @Operation(operationId = "createSpans", summary = "Create spans", description = "Create spans", responses = { @ApiResponse(responseCode = "204", description = "No Content")}) public Response createSpans( - @RequestBody(content = @Content(schema = @Schema(implementation = SpanBatch.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBatch spans) { + @RequestBody(content = @Content(schema = @Schema(implementation = SpanBulk.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBulk spans) { spanService.create(spans) .contextWrite(ctx -> setRequestContext(ctx, requestContext)) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java index 4f3b075ed5..dfe1b2d707 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java @@ -7,6 +7,7 @@ import com.comet.opik.domain.filter.FilterStrategy; import com.comet.opik.utils.JsonUtils; import com.comet.opik.utils.TemplateUtils; +import com.google.common.base.Preconditions; import com.newrelic.api.agent.Segment; import com.newrelic.api.agent.Trace; import io.r2dbc.spi.Connection; @@ -25,7 +26,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,7 +49,7 @@ @Slf4j class SpanDAO { - private static final String PLAIN_INSERT = """ + private static final String BULK_INSERT = """ INSERT INTO spans( id, project_id, @@ -494,27 +494,25 @@ public Mono insert(@NonNull Span span) { } @Trace(dispatcher = true) - public Mono batchInsert(@NonNull List spans) { - return Mono.from(connectionFactory.create()) - .flatMapMany(connection -> { - if (spans.isEmpty()) { - return Mono.just(0L); - } + public Mono bulkInsert(@NonNull List spans) { - return insert(spans, connection); - }) - .then(); + Preconditions.checkArgument(!spans.isEmpty(), "Spans list must not be empty"); + + return Mono.from(connectionFactory.create()) + .flatMapMany(connection -> insert(spans, connection)) + .flatMap(Result::getRowsUpdated) + .reduce(0L, Long::sum); } - private Publisher insert(Collection spans, Connection connection) { + private Publisher insert(List spans, Connection connection) { return makeMonoContextAware((userName, workspaceName, workspaceId) -> { List queryItems = getQueryItemPlaceHolder(spans.size()); - var template = new ST(PLAIN_INSERT) + var template = new ST(BULK_INSERT) .add("items", queryItems); - var statement = connection.createStatement(template.render()); + Statement statement = connection.createStatement(template.render()); int i = 0; for (Span span : spans) { @@ -524,40 +522,22 @@ private Publisher insert(Collection spans, Connection co .bind("trace_id" + i, span.traceId()) .bind("name" + i, span.name()) .bind("type" + i, span.type().toString()) - .bind("start_time" + i, span.startTime().toString()); + .bind("start_time" + i, span.startTime().toString()) + .bind("parent_span_id" + i, span.parentSpanId() != null ? span.parentSpanId() : "") + .bind("input" + i, span.input() != null ? span.input().toString() : "") + .bind("output" + i, span.output() != null ? span.output().toString() : "") + .bind("metadata" + i, span.metadata() != null ? span.metadata().toString() : "") + .bind("tags" + i, span.tags() != null ? span.tags().toArray(String[]::new) : new String[]{}) + .bind("created_at" + i, span.createdAt().toString()) + .bind("created_by" + i, userName) + .bind("last_updated_by" + i, userName); - if (span.parentSpanId() != null) { - statement.bind("parent_span_id" + i, span.parentSpanId()); - } else { - statement.bind("parent_span_id" + i, ""); - } if (span.endTime() != null) { statement.bind("end_time" + i, span.endTime().toString()); } else { statement.bindNull("end_time" + i, String.class); } - if (span.input() != null) { - statement.bind("input" + i, span.input().toString()); - } else { - statement.bind("input" + i, ""); - } - if (span.output() != null) { - statement.bind("output" + i, span.output().toString()); - } else { - statement.bind("output" + i, ""); - } - if (span.metadata() != null) { - statement.bind("metadata" + i, span.metadata().toString()); - } else { - statement.bind("metadata" + i, ""); - } - if (span.tags() != null) { - statement.bind("tags" + i, span.tags().toArray(String[]::new)); - } else { - statement.bind("tags" + i, new String[]{}); - } - if (span.usage() != null) { Stream.Builder keys = Stream.builder(); Stream.Builder values = Stream.builder(); @@ -574,9 +554,6 @@ private Publisher insert(Collection spans, Connection co statement.bind("usage_values" + i, new Integer[]{}); } - statement.bind("created_at" + i, span.createdAt().toString()); - statement.bind("created_by" + i, userName); - statement.bind("last_updated_by" + i, userName); i++; } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java index 10ad5124ec..3e50945944 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java @@ -3,7 +3,7 @@ import com.clickhouse.client.ClickHouseException; import com.comet.opik.api.Project; import com.comet.opik.api.Span; -import com.comet.opik.api.SpanBatch; +import com.comet.opik.api.SpanBulk; import com.comet.opik.api.SpanSearchCriteria; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.error.EntityAlreadyExistsException; @@ -252,7 +252,7 @@ public Mono validateSpanWorkspace(@NonNull String workspaceId, @NonNull } @Trace(dispatcher = true) - public Mono create(SpanBatch batch) { + public Mono create(SpanBulk batch) { if (batch.spans().isEmpty()) { return Mono.empty(); @@ -276,7 +276,7 @@ public Mono create(SpanBatch batch) { return lockService.lockAll(spanIds, SPAN_KEY) .flatMap(locks -> mergeSpans(spans, spanIds) - .flatMap(spanDAO::batchInsert) + .flatMap(spanDAO::bulkInsert) .doFinally(signalType -> lockService.unlockAll(locks).subscribe()) .subscribeOn(Schedulers.boundedElastic())) .then(); @@ -326,21 +326,7 @@ private Span mergeSpans(Span currentSpan, Span receivedSpan) { throw new EntityAlreadyExistsException(new ErrorMessage(List.of(PARENT_SPAN_IS_MISMATCH))); } - return Span.builder() - .id(currentSpan.id()) - .projectId(currentSpan.projectId() != null ? currentSpan.projectId() : receivedSpan.projectId()) - .traceId(currentSpan.traceId() != null ? currentSpan.traceId() : receivedSpan.traceId()) - .parentSpanId( - currentSpan.parentSpanId() != null ? currentSpan.parentSpanId() : receivedSpan.parentSpanId()) - .name(currentSpan.name() != null ? currentSpan.name() : receivedSpan.name()) - .type(currentSpan.type() != null ? currentSpan.type() : receivedSpan.type()) - .startTime(currentSpan.startTime() != null ? currentSpan.startTime() : receivedSpan.startTime()) - .endTime(receivedSpan.endTime() != null ? receivedSpan.endTime() : currentSpan.endTime()) - .input(receivedSpan.input() != null ? receivedSpan.input() : currentSpan.input()) - .output(receivedSpan.output() != null ? receivedSpan.output() : currentSpan.output()) - .metadata(receivedSpan.metadata() != null ? receivedSpan.metadata() : currentSpan.metadata()) - .tags(receivedSpan.tags() != null ? receivedSpan.tags() : currentSpan.tags()) - .usage(receivedSpan.usage() != null ? receivedSpan.usage() : currentSpan.usage()) + return receivedSpan.toBuilder() .createdAt(getInstant(currentSpan.createdAt(), receivedSpan.createdAt())) .build(); } @@ -355,7 +341,7 @@ private Instant getInstant(Instant current, Instant received) { } } - private List bindSpanToProjectAndId(SpanBatch batch, List projects) { + private List bindSpanToProjectAndId(SpanBulk batch, List projects) { Map projectPerName = projects.stream() .collect(Collectors.toMap(Project::name, Function.identity())); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java index 34d391f3f2..cf36565de7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java @@ -54,7 +54,7 @@ public String writeValueAsString(@NonNull Object value) { } } - public void writeValueAsString(ByteArrayOutputStream baos, @NonNull Object value) { + public void writeValueAsString(@NonNull ByteArrayOutputStream baos, @NonNull Object value) { try { MAPPER.writeValue(baos, value); } catch (IOException e) { diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index ec919aab07..aaf4a57daf 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -7,7 +7,7 @@ import com.comet.opik.api.Project; import com.comet.opik.api.ScoreSource; import com.comet.opik.api.Span; -import com.comet.opik.api.SpanBatch; +import com.comet.opik.api.SpanBulk; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.filter.Field; @@ -3254,11 +3254,11 @@ void batch__whenCreateSpansAndThenUpdateInSameRequest__thenReturnUpdated() { void batch__whenCreateSpansWithConflicts__thenReturn409(List expectedSpans, String expectedError) { try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) - .path("batch") + .path("bulk") .request() .header(HttpHeaders.AUTHORIZATION, API_KEY) .header(WORKSPACE_HEADER, TEST_WORKSPACE) - .post(Entity.json(new SpanBatch(expectedSpans)))) { + .post(Entity.json(new SpanBulk(expectedSpans)))) { assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(409); assertThat(actualResponse.hasEntity()).isTrue(); @@ -3302,17 +3302,16 @@ public Stream batch__whenCreateSpansWithConflicts__thenReturn409() { ) ); } - } private void batchCreateAndAssert(List expectedSpans, String apiKey, String workspaceName) { try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) - .path("batch") + .path("bulk") .request() .header(HttpHeaders.AUTHORIZATION, apiKey) .header(WORKSPACE_HEADER, workspaceName) - .post(Entity.json(new SpanBatch(expectedSpans)))) { + .post(Entity.json(new SpanBulk(expectedSpans)))) { assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204); assertThat(actualResponse.hasEntity()).isFalse();