From 7b6527cd76808e509fe603851f957e0554cbfcad Mon Sep 17 00:00:00 2001 From: Thiago dos Santos Hora Date: Fri, 13 Sep 2024 14:52:06 +0200 Subject: [PATCH] OPIK-75: Add batch spans creation endpoint (#205) * [NA] Remove creation time from sort key * OPIK-75: Add bulk spans creation endpoint * Add tests * Add code review suggestions * PR review feedback * Remove remaining dead code * Fix tests --- .../java/com/comet/opik/api/SpanBatch.java | 12 + .../api/resources/v1/priv/SpansResource.java | 27 ++ .../java/com/comet/opik/domain/SpanDAO.java | 120 +++++- .../com/comet/opik/domain/SpanService.java | 57 ++- .../instrumentation/InstrumentAsyncUtils.java | 2 + .../redis/RedissonLockService.java | 27 +- .../utils/ConditionalGZipFilter.java | 1 - .../resources/v1/priv/SpansResourceTest.java | 360 ++++++++++++------ .../comet/opik/domain/DummyLockService.java | 18 + .../comet/opik/domain/SpanServiceTest.java | 14 +- .../opik/domain/TraceServiceImplTest.java | 14 +- 11 files changed, 496 insertions(+), 156 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/domain/DummyLockService.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java b/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java new file mode 100644 index 0000000000..57435ebf26 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/SpanBatch.java @@ -0,0 +1,12 @@ +package com.comet.opik.api; + +import com.fasterxml.jackson.annotation.JsonView; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + +import java.util.List; + +public record SpanBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( { + Span.View.Write.class}) @Valid List spans){ +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java index 8d7ac2edd9..e337363f76 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java @@ -5,6 +5,7 @@ import com.comet.opik.api.FeedbackScore; import com.comet.opik.api.FeedbackScoreBatch; import com.comet.opik.api.Span; +import com.comet.opik.api.SpanBatch; import com.comet.opik.api.SpanSearchCriteria; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.filter.FiltersFactory; @@ -27,6 +28,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.ClientErrorException; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; @@ -47,6 +49,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.UUID; +import java.util.stream.Collectors; import static com.comet.opik.utils.AsyncUtils.setRequestContext; import static com.comet.opik.utils.ValidationUtils.validateProjectNameAndProjectId; @@ -132,6 +135,30 @@ public Response create( return Response.created(uri).build(); } + @POST + @Path("/batch") + @Operation(operationId = "createSpans", summary = "Create spans", description = "Create spans", responses = { + @ApiResponse(responseCode = "204", description = "No Content")}) + public Response createSpans( + @RequestBody(content = @Content(schema = @Schema(implementation = SpanBatch.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBatch spans) { + + spans.spans() + .stream() + .filter(span -> span.id() != null) // Filter out spans with null IDs + .collect(Collectors.groupingBy(Span::id)) + .forEach((id, spanGroup) -> { + if (spanGroup.size() > 1) { + throw new ClientErrorException("Duplicate span id '%s'".formatted(id), 422); + } + }); + + spanService.create(spans) + .contextWrite(ctx -> setRequestContext(ctx, requestContext)) + .block(); + + return Response.noContent().build(); + } + @PATCH @Path("{id}") @Operation(operationId = "updateSpan", summary = "Update span by id", description = "Update span by id", responses = { diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java index b75a170307..0a6a9daa4e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java @@ -6,6 +6,8 @@ import com.comet.opik.domain.filter.FilterQueryBuilder; import com.comet.opik.domain.filter.FilterStrategy; import com.comet.opik.utils.JsonUtils; +import com.comet.opik.utils.TemplateUtils; +import com.google.common.base.Preconditions; import com.newrelic.api.agent.Segment; import com.newrelic.api.agent.Trace; import io.r2dbc.spi.Connection; @@ -36,15 +38,60 @@ import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux; import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono; import static com.comet.opik.domain.FeedbackScoreDAO.EntityType; +import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment; import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware; import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware; +import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; @Singleton @RequiredArgsConstructor(onConstructor_ = @Inject) @Slf4j class SpanDAO { + private static final String BULK_INSERT = """ + INSERT INTO spans( + id, + project_id, + workspace_id, + trace_id, + parent_span_id, + name, + type, + start_time, + end_time, + input, + output, + metadata, + tags, + usage, + created_by, + last_updated_by + ) VALUES + , + :project_id, + :workspace_id, + :trace_id, + :parent_span_id, + :name, + :type, + parseDateTime64BestEffort(:start_time, 9), + if(:end_time IS NULL, NULL, parseDateTime64BestEffort(:end_time, 9)), + :input, + :output, + :metadata, + :tags, + mapFromArrays(:usage_keys, :usage_values), + :created_by, + :last_updated_by + ) + , + }> + ; + """; + /** * This query handles the insertion of a new span into the database in two cases: * 1. When the span does not exist in the database. @@ -444,6 +491,78 @@ public Mono insert(@NonNull Span span) { .then(); } + @Trace(dispatcher = true) + public Mono batchInsert(@NonNull List spans) { + + Preconditions.checkArgument(!spans.isEmpty(), "Spans list must not be empty"); + + return Mono.from(connectionFactory.create()) + .flatMapMany(connection -> insert(spans, connection)) + .flatMap(Result::getRowsUpdated) + .reduce(0L, Long::sum); + } + + private Publisher insert(List spans, Connection connection) { + + return makeMonoContextAware((userName, workspaceName, workspaceId) -> { + List queryItems = getQueryItemPlaceHolder(spans.size()); + + var template = new ST(BULK_INSERT) + .add("items", queryItems); + + Statement statement = connection.createStatement(template.render()); + + int i = 0; + for (Span span : spans) { + + statement.bind("id" + i, span.id()) + .bind("project_id" + i, span.projectId()) + .bind("trace_id" + i, span.traceId()) + .bind("name" + i, span.name()) + .bind("type" + i, span.type().toString()) + .bind("start_time" + i, span.startTime().toString()) + .bind("parent_span_id" + i, span.parentSpanId() != null ? span.parentSpanId() : "") + .bind("input" + i, span.input() != null ? span.input().toString() : "") + .bind("output" + i, span.output() != null ? span.output().toString() : "") + .bind("metadata" + i, span.metadata() != null ? span.metadata().toString() : "") + .bind("tags" + i, span.tags() != null ? span.tags().toArray(String[]::new) : new String[]{}) + .bind("created_by" + i, userName) + .bind("last_updated_by" + i, userName); + + if (span.endTime() != null) { + statement.bind("end_time" + i, span.endTime().toString()); + } else { + statement.bindNull("end_time" + i, String.class); + } + + if (span.usage() != null) { + Stream.Builder keys = Stream.builder(); + Stream.Builder values = Stream.builder(); + + span.usage().forEach((key, value) -> { + keys.add(key); + values.add(value); + }); + + statement.bind("usage_keys" + i, keys.build().toArray(String[]::new)); + statement.bind("usage_values" + i, values.build().toArray(Integer[]::new)); + } else { + statement.bind("usage_keys" + i, new String[]{}); + statement.bind("usage_values" + i, new Integer[]{}); + } + + i++; + } + + statement.bind("workspace_id", workspaceId); + + Segment segment = startSegment("spans", "Clickhouse", "batch_insert"); + + return Mono.from(statement.execute()) + .doFinally(signalType -> endSegment(segment)); + }); + } + private Publisher insert(Span span, Connection connection) { var template = newInsertTemplate(span); var statement = connection.createStatement(template.render()) @@ -788,5 +907,4 @@ public Mono> getSpanWorkspace(@NonNull Set sp row.get("id", UUID.class)))) .collectList(); } - } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java index 45cd6536fd..5f0c4fdf4e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java @@ -3,6 +3,7 @@ import com.clickhouse.client.ClickHouseException; import com.comet.opik.api.Project; import com.comet.opik.api.Span; +import com.comet.opik.api.SpanBatch; import com.comet.opik.api.SpanSearchCriteria; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.error.EntityAlreadyExistsException; @@ -11,6 +12,7 @@ import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.redis.LockService; import com.comet.opik.utils.WorkspaceUtils; +import com.google.common.base.Preconditions; import com.newrelic.api.agent.Trace; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -18,14 +20,18 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware; @@ -34,10 +40,10 @@ @Slf4j public class SpanService { - public static final String PROJECT_NAME_AND_WORKSPACE_MISMATCH = "Project name and workspace name do not match the existing span"; public static final String PARENT_SPAN_IS_MISMATCH = "parent_span_id does not match the existing span"; public static final String TRACE_ID_MISMATCH = "trace_id does not match the existing span"; public static final String SPAN_KEY = "Span"; + public static final String PROJECT_NAME_MISMATCH = "Project name and workspace name do not match the existing span"; private final @NonNull SpanDAO spanDAO; private final @NonNull ProjectService projectService; @@ -116,7 +122,7 @@ private Mono insertSpan(Span span, Project project, UUID id, Span existing } if (!project.id().equals(existingSpan.projectId())) { - return failWithConflict(PROJECT_NAME_AND_WORKSPACE_MISMATCH); + return failWithConflict(PROJECT_NAME_MISMATCH); } if (!Objects.equals(span.parentSpanId(), existingSpan.parentSpanId())) { @@ -191,7 +197,7 @@ private Mono handleSpanDBError(Throwable ex) { && (ex.getMessage().contains("_CAST(project_id, FixedString(36))") || ex.getMessage() .contains(", CAST(leftPad(workspace_id, 40, '*'), 'FixedString(19)') ::"))) { - return failWithConflict(PROJECT_NAME_AND_WORKSPACE_MISMATCH); + return failWithConflict(PROJECT_NAME_MISMATCH); } if (ex instanceof ClickHouseException @@ -214,7 +220,7 @@ private Mono handleSpanDBError(Throwable ex) { private Mono updateOrFail(SpanUpdate spanUpdate, UUID id, Span existingSpan, Project project) { if (!project.id().equals(existingSpan.projectId())) { - return failWithConflict(PROJECT_NAME_AND_WORKSPACE_MISMATCH); + return failWithConflict(PROJECT_NAME_MISMATCH); } if (!Objects.equals(existingSpan.parentSpanId(), spanUpdate.parentSpanId())) { @@ -244,4 +250,47 @@ public Mono validateSpanWorkspace(@NonNull String workspaceId, @NonNull return spanDAO.getSpanWorkspace(spanIds) .map(spanWorkspace -> spanWorkspace.stream().allMatch(span -> workspaceId.equals(span.workspaceId()))); } + + @Trace(dispatcher = true) + public Mono create(@NonNull SpanBatch batch) { + + Preconditions.checkArgument(!batch.spans().isEmpty(), "Batch spans must not be empty"); + + List projectNames = batch.spans() + .stream() + .map(Span::projectName) + .distinct() + .toList(); + + Mono> resolveProjects = Flux.fromIterable(projectNames) + .flatMap(this::resolveProject) + .collectList() + .map(projects -> bindSpanToProjectAndId(batch, projects)) + .subscribeOn(Schedulers.boundedElastic()); + + return resolveProjects + .flatMap(spanDAO::batchInsert); + } + + private List bindSpanToProjectAndId(SpanBatch batch, List projects) { + Map projectPerName = projects.stream() + .collect(Collectors.toMap(Project::name, Function.identity())); + + return batch.spans() + .stream() + .map(span -> { + String projectName = WorkspaceUtils.getProjectName(span.projectName()); + Project project = projectPerName.get(projectName); + + UUID id = span.id() == null ? idGenerator.generateId() : span.id(); + IdGenerator.validateVersion(id, SPAN_KEY); + + return span.toBuilder().id(id).projectId(project.id()).build(); + }) + .toList(); + } + + private Mono resolveProject(String projectName) { + return getOrCreateProject(WorkspaceUtils.getProjectName(projectName)); + } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java index cc6f81b4df..0ba6e20572 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java @@ -3,10 +3,12 @@ import com.newrelic.api.agent.DatastoreParameters; import com.newrelic.api.agent.NewRelic; import com.newrelic.api.agent.Segment; +import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import reactor.core.scheduler.Schedulers; @Slf4j +@UtilityClass public class InstrumentAsyncUtils { public static Segment startSegment(String segmentName, String product, String operationName) { diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedissonLockService.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedissonLockService.java index ded698ca27..5ef02eac86 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedissonLockService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedissonLockService.java @@ -21,14 +21,9 @@ class RedissonLockService implements LockService { private final @NonNull DistributedLockConfig distributedLockConfig; @Override - public Mono executeWithLock(Lock lock, Mono action) { + public Mono executeWithLock(@NonNull Lock lock, @NonNull Mono action) { - RPermitExpirableSemaphoreReactive semaphore = redisClient.getPermitExpirableSemaphore( - CommonOptions - .name(lock.key()) - .timeout(Duration.ofMillis(distributedLockConfig.getLockTimeoutMS())) - .retryInterval(Duration.ofMillis(10)) - .retryAttempts(distributedLockConfig.getLockTimeoutMS() / 10)); + RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS()); log.debug("Trying to lock with {}", lock); @@ -43,6 +38,15 @@ public Mono executeWithLock(Lock lock, Mono action) { })); } + private RPermitExpirableSemaphoreReactive getSemaphore(Lock lock, int lockTimeoutMS) { + return redisClient.getPermitExpirableSemaphore( + CommonOptions + .name(lock.key()) + .timeout(Duration.ofMillis(lockTimeoutMS)) + .retryInterval(Duration.ofMillis(10)) + .retryAttempts(lockTimeoutMS / 10)); + } + private Mono runAction(Lock lock, Mono action, String locked) { if (locked != null) { log.debug("Lock {} acquired", lock); @@ -53,13 +57,8 @@ private Mono runAction(Lock lock, Mono action, String locked) { } @Override - public Flux executeWithLock(Lock lock, Flux stream) { - RPermitExpirableSemaphoreReactive semaphore = redisClient.getPermitExpirableSemaphore( - CommonOptions - .name(lock.key()) - .timeout(Duration.ofMillis(distributedLockConfig.getLockTimeoutMS())) - .retryInterval(Duration.ofMillis(10)) - .retryAttempts(distributedLockConfig.getLockTimeoutMS() / 10)); + public Flux executeWithLock(@NonNull Lock lock, @NonNull Flux stream) { + RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS()); return semaphore .trySetPermits(1) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/ConditionalGZipFilter.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/ConditionalGZipFilter.java index 24ce718593..8df64f0cfb 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/ConditionalGZipFilter.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/ConditionalGZipFilter.java @@ -1,6 +1,5 @@ package com.comet.opik.api.resources.utils; - import com.comet.opik.utils.JsonUtils; import jakarta.ws.rs.client.ClientRequestContext; import jakarta.ws.rs.client.ClientRequestFilter; diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index 39a92f3e5b..60db8e83ce 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -7,6 +7,7 @@ import com.comet.opik.api.Project; import com.comet.opik.api.ScoreSource; import com.comet.opik.api.Span; +import com.comet.opik.api.SpanBatch; import com.comet.opik.api.SpanUpdate; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.filter.Field; @@ -160,7 +161,7 @@ private static void mockTargetWorkspace(String apiKey, String workspaceName, Str AuthTestUtils.mockTargetWorkspace(wireMock.server(), apiKey, workspaceName, workspaceId, USER); } - private UUID getProjectId(ClientSupport client, String projectName, String workspaceName, String apiKey) { + private UUID getProjectId(String projectName, String workspaceName, String apiKey) { return client.target("%s/v1/private/projects".formatted(baseURI)) .queryParam("name", projectName) .request() @@ -175,6 +176,19 @@ private UUID getProjectId(ClientSupport client, String projectName, String works .id(); } + private UUID createProject(String projectName, String workspaceName, String apiKey) { + try (Response response = client.target("%s/v1/private/projects".formatted(baseURI)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .post(Entity.json(Project.builder().name(projectName).build()))) { + + assertThat(response.getStatusInfo().getStatusCode()).isEqualTo(201); + return UUID.fromString( + response.getLocation().getPath().substring(response.getLocation().getPath().lastIndexOf('/') + 1)); + } + } + @Nested @DisplayName("Api Key Authentication:") @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -2958,122 +2972,124 @@ void getByProjectName__whenFilterInvalidValueOrKeyForFieldType__thenReturn400(Fi assertThat(actualError).isEqualTo(expectedError); } - private void getAndAssertPage( - String workspaceName, - String projectName, - List filters, - List spans, - List expectedSpans, - List unexpectedSpans, String apiKey) { - int page = 1; - int size = spans.size() + expectedSpans.size() + unexpectedSpans.size(); - getAndAssertPage( - workspaceName, - projectName, - null, - null, - null, - filters, - page, - size, - expectedSpans, - expectedSpans.size(), - unexpectedSpans, apiKey); + private List updateFeedbackScore(List feedbackScores, int index, double val) { + feedbackScores.set(index, feedbackScores.get(index).toBuilder() + .value(BigDecimal.valueOf(val)) + .build()); + return feedbackScores; } - private void getAndAssertPage( - String workspaceName, - String projectName, - UUID projectId, - UUID traceId, - SpanType type, - List filters, - int page, - int size, - List expectedSpans, - int expectedTotal, - List unexpectedSpans, String apiKey) { - try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) - .queryParam("page", page) - .queryParam("size", size) - .queryParam("project_name", projectName) - .queryParam("project_id", projectId) - .queryParam("trace_id", traceId) - .queryParam("type", type) - .queryParam("filters", toURLEncodedQueryParam(filters)) - .request() - .header(HttpHeaders.AUTHORIZATION, apiKey) - .header(WORKSPACE_HEADER, workspaceName) - .get()) { - var actualPage = actualResponse.readEntity(Span.SpanPage.class); - var actualSpans = actualPage.content(); + private List updateFeedbackScore( + List destination, List source, int index) { + destination.set(index, source.get(index).toBuilder().build()); + return destination; + } + } - assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200); + private void getAndAssertPage( + String workspaceName, + String projectName, + List filters, + List spans, + List expectedSpans, + List unexpectedSpans, String apiKey) { + int page = 1; + int size = spans.size() + expectedSpans.size() + unexpectedSpans.size(); + getAndAssertPage( + workspaceName, + projectName, + null, + null, + null, + filters, + page, + size, + expectedSpans, + expectedSpans.size(), + unexpectedSpans, apiKey); + } - assertThat(actualPage.page()).isEqualTo(page); - assertThat(actualPage.size()).isEqualTo(expectedSpans.size()); - assertThat(actualPage.total()).isEqualTo(expectedTotal); + private void getAndAssertPage( + String workspaceName, + String projectName, + UUID projectId, + UUID traceId, + SpanType type, + List filters, + int page, + int size, + List expectedSpans, + int expectedTotal, + List unexpectedSpans, String apiKey) { + try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .queryParam("page", page) + .queryParam("size", size) + .queryParam("project_name", projectName) + .queryParam("project_id", projectId) + .queryParam("trace_id", traceId) + .queryParam("type", type) + .queryParam("filters", toURLEncodedQueryParam(filters)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get()) { + var actualPage = actualResponse.readEntity(Span.SpanPage.class); + var actualSpans = actualPage.content(); - assertThat(actualSpans.size()).isEqualTo(expectedSpans.size()); - assertThat(actualSpans) - .usingRecursiveFieldByFieldElementComparatorIgnoringFields(IGNORED_FIELDS) - .containsExactlyElementsOf(expectedSpans); - assertIgnoredFields(actualSpans, expectedSpans); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200); + + assertThat(actualPage.page()).isEqualTo(page); + assertThat(actualPage.size()).isEqualTo(expectedSpans.size()); + assertThat(actualPage.total()).isEqualTo(expectedTotal); + assertThat(actualSpans.size()).isEqualTo(expectedSpans.size()); + assertThat(actualSpans) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields(IGNORED_FIELDS) + .containsExactlyElementsOf(expectedSpans); + assertIgnoredFields(actualSpans, expectedSpans); + + if (!unexpectedSpans.isEmpty()) { assertThat(actualSpans) .usingRecursiveFieldByFieldElementComparatorIgnoringFields(IGNORED_FIELDS) .doesNotContainAnyElementsOf(unexpectedSpans); } } + } - private String toURLEncodedQueryParam(List filters) { - return CollectionUtils.isEmpty(filters) - ? null - : URLEncoder.encode(JsonUtils.writeValueAsString(filters), StandardCharsets.UTF_8); - } + private String toURLEncodedQueryParam(List filters) { + return CollectionUtils.isEmpty(filters) + ? null + : URLEncoder.encode(JsonUtils.writeValueAsString(filters), StandardCharsets.UTF_8); + } - private void assertIgnoredFields(List actualSpans, List expectedSpans) { - for (int i = 0; i < actualSpans.size(); i++) { - var actualSpan = actualSpans.get(i); - var expectedSpan = expectedSpans.get(i); - var expectedFeedbackScores = expectedSpan.feedbackScores() == null - ? null - : expectedSpan.feedbackScores().reversed(); - assertThat(actualSpan.projectId()).isNotNull(); - assertThat(actualSpan.projectName()).isNull(); - assertThat(actualSpan.createdAt()).isAfter(expectedSpan.createdAt()); - assertThat(actualSpan.lastUpdatedAt()).isAfter(expectedSpan.lastUpdatedAt()); - assertThat(actualSpan.feedbackScores()) - .usingRecursiveComparison( - RecursiveComparisonConfiguration.builder() - .withComparatorForType(BigDecimal::compareTo, BigDecimal.class) - .withIgnoredFields(IGNORED_FIELDS_SCORES) - .build()) - .isEqualTo(expectedFeedbackScores); - - if (actualSpan.feedbackScores() != null) { - actualSpan.feedbackScores().forEach(feedbackScore -> { - assertThat(feedbackScore.createdAt()).isAfter(expectedSpan.createdAt()); - assertThat(feedbackScore.lastUpdatedAt()).isAfter(expectedSpan.lastUpdatedAt()); - assertThat(feedbackScore.createdBy()).isEqualTo(USER); - assertThat(feedbackScore.lastUpdatedBy()).isEqualTo(USER); - }); - } + private void assertIgnoredFields(List actualSpans, List expectedSpans) { + for (int i = 0; i < actualSpans.size(); i++) { + var actualSpan = actualSpans.get(i); + var expectedSpan = expectedSpans.get(i); + var expectedFeedbackScores = expectedSpan.feedbackScores() == null + ? null + : expectedSpan.feedbackScores().reversed(); + assertThat(actualSpan.projectId()).isNotNull(); + assertThat(actualSpan.projectName()).isNull(); + assertThat(actualSpan.createdAt()).isAfter(expectedSpan.createdAt()); + assertThat(actualSpan.lastUpdatedAt()).isAfter(expectedSpan.lastUpdatedAt()); + assertThat(actualSpan.feedbackScores()) + .usingRecursiveComparison( + RecursiveComparisonConfiguration.builder() + .withComparatorForType(BigDecimal::compareTo, BigDecimal.class) + .withIgnoredFields(IGNORED_FIELDS_SCORES) + .build()) + .isEqualTo(expectedFeedbackScores); + + if (actualSpan.feedbackScores() != null) { + actualSpan.feedbackScores().forEach(feedbackScore -> { + assertThat(feedbackScore.createdAt()).isAfter(expectedSpan.createdAt()); + assertThat(feedbackScore.lastUpdatedAt()).isAfter(expectedSpan.lastUpdatedAt()); + assertThat(feedbackScore.createdBy()).isEqualTo(USER); + assertThat(feedbackScore.lastUpdatedBy()).isEqualTo(USER); + }); } } - - private List updateFeedbackScore(List feedbackScores, int index, double val) { - feedbackScores.set(index, feedbackScores.get(index).toBuilder() - .value(BigDecimal.valueOf(val)) - .build()); - return feedbackScores; - } - - private List updateFeedbackScore( - List destination, List source, int index) { - destination.set(index, source.get(index).toBuilder().build()); - return destination; - } } private UUID createAndAssert(Span expectedSpan, String apiKey, String workspaceName) { @@ -3189,6 +3205,130 @@ void createWhenTryingToCreateSpanTwice() { } } + @Nested + @DisplayName("Batch:") + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class BatchInsert { + + @Test + void batch__whenCreateSpans__thenReturnNoContent() { + + String projectName = UUID.randomUUID().toString(); + UUID projectId = createProject(projectName, TEST_WORKSPACE, API_KEY); + + var expectedSpans = IntStream.range(0, 1000) + .mapToObj(i -> podamFactory.manufacturePojo(Span.class).toBuilder() + .projectId(projectId) + .projectName(projectName) + .parentSpanId(null) + .feedbackScores(null) + .build()) + .toList(); + + batchCreateAndAssert(expectedSpans, API_KEY, TEST_WORKSPACE); + + getAndAssertPage(TEST_WORKSPACE, projectName, List.of(), List.of(), expectedSpans.reversed(), List.of(), + API_KEY); + } + + @Test + void batch__whenSendingMultipleSpansWithSameId__thenReturn422() { + var expectedSpans = List.of(podamFactory.manufacturePojo(Span.class).toBuilder() + .projectId(null) + .parentSpanId(null) + .feedbackScores(null) + .build()); + + var expectedSpan = expectedSpans.getFirst().toBuilder() + .tags(Set.of()) + .endTime(Instant.now()) + .output(JsonUtils.getJsonNodeFromString("{ \"output\": \"data\"}")) + .build(); + + List expectedSpans1 = List.of(expectedSpans.getFirst(), expectedSpan); + + try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .path("batch") + .request() + .header(HttpHeaders.AUTHORIZATION, API_KEY) + .header(WORKSPACE_HEADER, TEST_WORKSPACE) + .post(Entity.json(new SpanBatch(expectedSpans1)))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(422); + assertThat(actualResponse.hasEntity()).isTrue(); + + var errorMessage = actualResponse.readEntity(io.dropwizard.jersey.errors.ErrorMessage.class); + assertThat(errorMessage.getMessage()).isEqualTo("Duplicate span id '%s'".formatted(expectedSpan.id())); + } + } + + @ParameterizedTest + @MethodSource + void batch__whenBatchIsInvalid__thenReturn422(List spans, String errorMessage) { + + try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .path("batch") + .request() + .header(HttpHeaders.AUTHORIZATION, API_KEY) + .header(WORKSPACE_HEADER, TEST_WORKSPACE) + .post(Entity.json(new SpanBatch(spans)))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(422); + assertThat(actualResponse.hasEntity()).isTrue(); + + var responseBody = actualResponse.readEntity(ErrorMessage.class); + assertThat(responseBody.errors()).contains(errorMessage); + } + } + + Stream batch__whenBatchIsInvalid__thenReturn422() { + return Stream.of( + Arguments.of(List.of(), "spans size must be between 1 and 1000"), + Arguments.of(IntStream.range(0, 1001) + .mapToObj(i -> podamFactory.manufacturePojo(Span.class).toBuilder() + .projectId(null) + .parentSpanId(null) + .feedbackScores(null) + .build()) + .toList(), "spans size must be between 1 and 1000")); + } + + @Test + void batch__whenSendingMultipleSpansWithNoId__thenReturnNoContent() { + var newSpan = podamFactory.manufacturePojo(Span.class).toBuilder() + .projectId(null) + .id(null) + .parentSpanId(null) + .feedbackScores(null) + .build(); + + var expectedSpan = newSpan.toBuilder() + .tags(Set.of()) + .endTime(Instant.now()) + .output(JsonUtils.getJsonNodeFromString("{ \"output\": \"data\"}")) + .build(); + + List expectedSpans = List.of(newSpan, expectedSpan); + + batchCreateAndAssert(expectedSpans, API_KEY, TEST_WORKSPACE); + } + + } + + private void batchCreateAndAssert(List expectedSpans, String apiKey, String workspaceName) { + + try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .path("batch") + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .post(Entity.json(new SpanBatch(expectedSpans)))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204); + assertThat(actualResponse.hasEntity()).isFalse(); + } + } + private Span getAndAssert(Span expectedSpan, String apiKey, String workspaceName) { try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) .path(expectedSpan.id().toString()) @@ -3389,7 +3529,7 @@ void when__spanDoesNotExist__thenReturnCreateIt() { var actualResponse = getById(id, TEST_WORKSPACE, API_KEY); - var projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + var projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); var actualEntity = actualResponse.readEntity(Span.class); assertThat(actualEntity.id()).isEqualTo(id); @@ -3433,7 +3573,7 @@ void when__spanUpdateAndInsertAreProcessedOutOfOther__thenReturnSpan() { var actualResponse = getById(id, TEST_WORKSPACE, API_KEY); - var projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + var projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); var actualEntity = actualResponse.readEntity(Span.class); assertThat(actualEntity.id()).isEqualTo(id); @@ -3641,7 +3781,7 @@ void when__multipleSpanUpdateAndInsertAreProcessedOutOfOtherAndConcurrent__thenR var actualEntity = actualResponse.readEntity(Span.class); assertThat(actualEntity.id()).isEqualTo(id); - var projectId = getProjectId(client, projectName, TEST_WORKSPACE, API_KEY); + var projectId = getProjectId(projectName, TEST_WORKSPACE, API_KEY); assertThat(actualEntity.projectId()).isEqualTo(projectId); assertThat(actualEntity.traceId()).isEqualTo(spanUpdate1.traceId()); @@ -3680,7 +3820,7 @@ void update__whenTagsIsEmpty__thenAcceptUpdate() { runPatchAndAssertStatus(expectedSpan.id(), spanUpdate, API_KEY, TEST_WORKSPACE); - UUID projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + UUID projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); Span updatedSpan = expectedSpan.toBuilder() .tags(spanUpdate.tags()) @@ -3713,7 +3853,7 @@ void update__whenMetadataIsEmpty__thenAcceptUpdate() { runPatchAndAssertStatus(expectedSpan.id(), spanUpdate, API_KEY, TEST_WORKSPACE); - UUID projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + UUID projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); Span updatedSpan = expectedSpan.toBuilder() .metadata(metadata) @@ -3746,7 +3886,7 @@ void update__whenInputIsEmpty__thenAcceptUpdate() { runPatchAndAssertStatus(expectedSpan.id(), spanUpdate, API_KEY, TEST_WORKSPACE); - UUID projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + UUID projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); Span updatedSpan = expectedSpan.toBuilder() .input(input) @@ -3778,7 +3918,7 @@ void update__whenOutputIsEmpty__thenAcceptUpdate() { runPatchAndAssertStatus(expectedSpan.id(), spanUpdate, API_KEY, TEST_WORKSPACE); - UUID projectId = getProjectId(client, spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); + UUID projectId = getProjectId(spanUpdate.projectName(), TEST_WORKSPACE, API_KEY); Span updatedSpan = expectedSpan.toBuilder() .output(output) @@ -3800,7 +3940,7 @@ void update__whenUpdatingUsingProjectId__thenAcceptUpdate() { createAndAssert(expectedSpan, API_KEY, TEST_WORKSPACE); - var projectId = getProjectId(client, expectedSpan.projectName(), TEST_WORKSPACE, API_KEY); + var projectId = getProjectId(expectedSpan.projectName(), TEST_WORKSPACE, API_KEY); var spanUpdate = podamFactory.manufacturePojo(SpanUpdate.class).toBuilder() .traceId(expectedSpan.traceId()) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/DummyLockService.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/DummyLockService.java new file mode 100644 index 0000000000..faf96f9979 --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/DummyLockService.java @@ -0,0 +1,18 @@ +package com.comet.opik.domain; + +import com.comet.opik.infrastructure.redis.LockService; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class DummyLockService implements LockService { + + @Override + public Mono executeWithLock(LockService.Lock lock, Mono action) { + return action; + } + + @Override + public Flux executeWithLock(LockService.Lock lock, Flux action) { + return action; + } +} \ No newline at end of file diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/SpanServiceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/SpanServiceTest.java index 7570c2406c..c5e404b98a 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/SpanServiceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/SpanServiceTest.java @@ -10,7 +10,6 @@ import jakarta.ws.rs.NotFoundException; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import uk.co.jemos.podam.api.PodamFactory; @@ -26,18 +25,7 @@ @Disabled class SpanServiceTest { - private static final LockService DUMMY_LOCK_SERVICE = new LockService() { - - @Override - public Mono executeWithLock(Lock lock, Mono action) { - return action; - } - - @Override - public Flux executeWithLock(Lock lock, Flux action) { - return action; - } - }; + private static final LockService DUMMY_LOCK_SERVICE = new DummyLockService(); private final PodamFactory podamFactory = PodamFactoryUtils.newPodamFactory(); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index 78d0a40666..2188190fdc 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import uk.co.jemos.podam.api.PodamFactory; import uk.co.jemos.podam.api.PodamFactoryImpl; @@ -40,18 +39,7 @@ @ExtendWith(MockitoExtension.class) class TraceServiceImplTest { - public static final LockService DUMMY_LOCK_SERVICE = new LockService() { - - @Override - public Mono executeWithLock(Lock lock, Mono action) { - return action; - } - - @Override - public Flux executeWithLock(Lock lock, Flux action) { - return action; - } - }; + public static final LockService DUMMY_LOCK_SERVICE = new DummyLockService(); private TraceServiceImpl traceService;