From 7a6db001505671628935f58f0ce0143ce97d05eb Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 12 Dec 2024 15:34:57 +0100 Subject: [PATCH 01/22] [OPIK-446] Add durartion to trace_span --- .../src/main/java/com/comet/opik/api/Span.java | 14 ++++++++++++++ .../src/main/java/com/comet/opik/api/Trace.java | 14 ++++++++++++++ .../api/resources/v1/priv/TracesResourceTest.java | 2 ++ 3 files changed, 30 insertions(+) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java index 2ebc3b1cd9..d31afd38ea 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java @@ -2,6 +2,7 @@ import com.comet.opik.domain.SpanType; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.PropertyNamingStrategies; @@ -13,6 +14,7 @@ import lombok.Builder; import java.math.BigDecimal; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -73,4 +75,16 @@ public static class Write { public static class Public { } } + + @JsonProperty + @JsonView({Span.View.Public.class}) + @Schema(accessMode = Schema.AccessMode.READ_ONLY) + public Double duration() { + if (endTime == null) { + return null; + } + + long micros = Duration.between(startTime, endTime).toNanos() / 1_000; + return micros / 1_000.0; + } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java index b5f04a696a..056f7b07b7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java @@ -1,6 +1,7 @@ package com.comet.opik.api; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.PropertyNamingStrategies; @@ -12,6 +13,7 @@ import lombok.Builder; import java.math.BigDecimal; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -68,4 +70,16 @@ public static class Write { public static class Public { } } + + @JsonProperty + @JsonView({Span.View.Public.class}) + @Schema(accessMode = Schema.AccessMode.READ_ONLY) + public Double duration() { + if (endTime == null) { + return null; + } + + long micros = Duration.between(startTime, endTime).toNanos() / 1_000; + return micros / 1_000.0; + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index bf563902d0..206c437269 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -19,6 +19,8 @@ import com.comet.opik.api.filter.Field; import com.comet.opik.api.filter.Filter; import com.comet.opik.api.filter.Operator; +import com.comet.opik.api.filter.SpanField; +import com.comet.opik.api.filter.SpanFilter; import com.comet.opik.api.filter.TraceField; import com.comet.opik.api.filter.TraceFilter; import com.comet.opik.api.resources.utils.AuthTestUtils; From 4b3dd68e1e10b5e27a7a4a6734804807611e29c6 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 12 Dec 2024 15:38:39 +0100 Subject: [PATCH 02/22] Fix calc --- .../main/java/com/comet/opik/api/Span.java | 11 +++------ .../main/java/com/comet/opik/api/Trace.java | 11 +++------ .../com/comet/opik/utils/DurationUtils.java | 23 +++++++++++++++++++ 3 files changed, 29 insertions(+), 16 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java index d31afd38ea..427607cdb2 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java @@ -1,6 +1,7 @@ package com.comet.opik.api; import com.comet.opik.domain.SpanType; +import com.comet.opik.utils.DurationUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; @@ -14,7 +15,6 @@ import lombok.Builder; import java.math.BigDecimal; -import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -78,13 +78,8 @@ public static class Public { @JsonProperty @JsonView({Span.View.Public.class}) - @Schema(accessMode = Schema.AccessMode.READ_ONLY) + @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") public Double duration() { - if (endTime == null) { - return null; - } - - long micros = Duration.between(startTime, endTime).toNanos() / 1_000; - return micros / 1_000.0; + return DurationUtils.getDurationInSeconds(startTime, endTime); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java index 056f7b07b7..6ab72f6fa1 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java @@ -1,5 +1,6 @@ package com.comet.opik.api; +import com.comet.opik.utils.DurationUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; @@ -13,7 +14,6 @@ import lombok.Builder; import java.math.BigDecimal; -import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -73,13 +73,8 @@ public static class Public { @JsonProperty @JsonView({Span.View.Public.class}) - @Schema(accessMode = Schema.AccessMode.READ_ONLY) + @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") public Double duration() { - if (endTime == null) { - return null; - } - - long micros = Duration.between(startTime, endTime).toNanos() / 1_000; - return micros / 1_000.0; + return DurationUtils.getDurationInSeconds(startTime, endTime); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java new file mode 100644 index 0000000000..2fd55f9fd5 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java @@ -0,0 +1,23 @@ +package com.comet.opik.utils; + +import lombok.NonNull; +import lombok.experimental.UtilityClass; + +import java.time.Duration; +import java.time.Instant; + +@UtilityClass +public class DurationUtils { + + public static final Double TIME_UNIT = 1_000.0; + + public static Double getDurationInSeconds(@NonNull Instant startTime, Instant endTime) { + if (endTime == null) { + return null; + } + + long micros = Duration.between(startTime, endTime).toNanos() / TIME_UNIT.longValue(); + return micros / TIME_UNIT; + } + +} From f4da0e3348afb748aa9a2153d8a2b58b7294baad Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 13 Dec 2024 09:39:04 +0100 Subject: [PATCH 03/22] Fix pr review --- .../main/java/com/comet/opik/api/Span.java | 2 +- .../main/java/com/comet/opik/api/Trace.java | 2 +- .../com/comet/opik/utils/DurationUtils.java | 2 +- .../resources/v1/priv/SpansResourceTest.java | 6 +- .../resources/v1/priv/TracesResourceTest.java | 87 +++++++++++++++++-- 5 files changed, 89 insertions(+), 10 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java index 427607cdb2..d75842f042 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java @@ -80,6 +80,6 @@ public static class Public { @JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") public Double duration() { - return DurationUtils.getDurationInSeconds(startTime, endTime); + return DurationUtils.getDurationInMillisWithSubMilliPrecision(startTime, endTime); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java index 6ab72f6fa1..c42a403644 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java @@ -75,6 +75,6 @@ public static class Public { @JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") public Double duration() { - return DurationUtils.getDurationInSeconds(startTime, endTime); + return DurationUtils.getDurationInMillisWithSubMilliPrecision(startTime, endTime); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java index 2fd55f9fd5..74ddd85010 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java @@ -11,7 +11,7 @@ public class DurationUtils { public static final Double TIME_UNIT = 1_000.0; - public static Double getDurationInSeconds(@NonNull Instant startTime, Instant endTime) { + public static Double getDurationInMillisWithSubMilliPrecision(@NonNull Instant startTime, Instant endTime) { if (endTime == null) { return null; } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index 2b5c224280..c62bca1229 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -7607,7 +7607,8 @@ static Stream getSpanStats__whenFilterInvalidOperatorForFieldType__thenR .field(SpanField.DURATION) .operator(Operator.STARTS_WITH) .value("1") - .build()); + .build() + ); } @ParameterizedTest @@ -7721,7 +7722,8 @@ static Stream getSpanStats__whenFilterInvalidValueOrKeyForFieldType__the .field(SpanField.DURATION) .operator(Operator.EQUAL) .value(RandomStringUtils.randomAlphanumeric(5)) - .build()); + .build() + ); } @ParameterizedTest diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index 206c437269..34ed7bf5f9 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -19,8 +19,6 @@ import com.comet.opik.api.filter.Field; import com.comet.opik.api.filter.Filter; import com.comet.opik.api.filter.Operator; -import com.comet.opik.api.filter.SpanField; -import com.comet.opik.api.filter.SpanFilter; import com.comet.opik.api.filter.TraceField; import com.comet.opik.api.filter.TraceFilter; import com.comet.opik.api.resources.utils.AuthTestUtils; @@ -3084,7 +3082,8 @@ static Stream getTracesByProject__whenFilterInvalidOperatorForFieldType_ .field(TraceField.DURATION) .operator(Operator.NOT_CONTAINS) .value("1") - .build()); + .build() + ); } @ParameterizedTest @@ -6819,6 +6818,82 @@ void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator oper getStatsAndAssert(projectName, null, filters, apiKey, workspaceName, expectedStats); } + Stream getTraceStats__whenFilterByDuration__thenReturnTracesFiltered() { + return Stream.of( + arguments(Operator.EQUAL, + Duration.ofMillis(1L).toNanos() / 1000, 1.0), + arguments(Operator.GREATER_THAN, + Duration.ofMillis(8L).toNanos() / 1000, 7.0), + arguments(Operator.GREATER_THAN_EQUAL, + Duration.ofMillis(1L).toNanos() / 1000, 1.0), + arguments(Operator.GREATER_THAN_EQUAL, + Duration.ofMillis(1L).plusNanos(1000).toNanos() / 1000, 1.0), + arguments(Operator.LESS_THAN, + Duration.ofMillis(1L).plusNanos(1).toNanos() / 1000, 2.0), + arguments(Operator.LESS_THAN_EQUAL, + Duration.ofMillis(1L).toNanos() / 1000, 1.0), + arguments(Operator.LESS_THAN_EQUAL, + Duration.ofMillis(1L).toNanos() / 1000, 2.0)); + } + + @ParameterizedTest + @MethodSource + void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator operator, long end, double duration) { + String workspaceName = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var projectName = generator.generate().toString(); + var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class) + .stream() + .map(trace -> { + Instant now = Instant.now(); + return trace.toBuilder() + .projectId(null) + .usage(null) + .projectName(projectName) + .feedbackScores(null) + .totalEstimatedCost(BigDecimal.ZERO) + .startTime(now) + .endTime(Set.of(Operator.LESS_THAN, Operator.LESS_THAN_EQUAL).contains(operator) + ? Instant.now().plusSeconds(2) + : now.plusNanos(1000)) + .build(); + }) + .collect(Collectors.toCollection(ArrayList::new)); + + var start = Instant.now().truncatedTo(ChronoUnit.MILLIS); + traces.set(0, traces.getFirst().toBuilder() + .startTime(start) + .endTime(start.plus(end, ChronoUnit.MICROS)) + .build()); + + traces.forEach(expectedTrace -> create(expectedTrace, apiKey, workspaceName)); + + var expectedTraces = List.of(traces.getFirst()); + + var unexpectedTraces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class).stream() + .map(span -> span.toBuilder() + .projectId(null) + .build()) + .toList(); + + unexpectedTraces.forEach(expectedTrace -> create(expectedTrace, apiKey, workspaceName)); + + var filters = List.of( + TraceFilter.builder() + .field(TraceField.DURATION) + .operator(operator) + .value(String.valueOf(duration)) + .build()); + + var expectedStats = getProjectTraceStatItems(expectedTraces); + + getStatsAndAssert(projectName, null, filters, apiKey, workspaceName, expectedStats); + } + private void getStatsAndAssert(String projectName, UUID projectId, List filters, String apiKey, String workspaceName, List> expectedStats) { WebTarget webTarget = client.target(URL_TEMPLATE.formatted(baseURI)) @@ -7634,7 +7709,8 @@ static Stream getTraceStats__whenFilterInvalidOperatorForFieldType__then .field(TraceField.DURATION) .operator(Operator.NOT_CONTAINS) .value("1") - .build()); + .build() + ); } @ParameterizedTest @@ -7735,7 +7811,8 @@ static Stream getTraceStats__whenFilterInvalidValueOrKeyForFieldType__th .field(TraceField.DURATION) .operator(Operator.EQUAL) .value(RandomStringUtils.randomAlphanumeric(5)) - .build()); + .build() + ); } @ParameterizedTest From 4b441daed5b9e67c9e9d5af22a29b952f0eb5aa8 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 13 Dec 2024 12:05:20 +0100 Subject: [PATCH 04/22] [OPIK-446] Add duration metric --- .../com/comet/opik/domain/ProjectMetricsDAO.java | 1 + .../comet/opik/domain/ProjectMetricsService.java | 14 +++++++++----- .../v1/priv/ProjectMetricsResourceTest.java | 4 ++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java index cbeb50af85..4ff0ab041d 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java @@ -56,6 +56,7 @@ record Entry(String name, Instant time, Number value) { @Singleton @RequiredArgsConstructor(onConstructor_ = @Inject) class ProjectMetricsDAOImpl implements ProjectMetricsDAO { + private final @NonNull TransactionTemplateAsync template; private static final Map INTERVAL_TO_SQL = Map.of( diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java index dce05d0dc8..7c43f2bd45 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java @@ -28,16 +28,20 @@ public interface ProjectMetricsService { @Slf4j @Singleton class ProjectMetricsServiceImpl implements ProjectMetricsService { + private final @NonNull Map>>> metricHandler; + @Inject + public ProjectMetricsServiceImpl(@NonNull Map>>> metricHandler; + @Inject public ProjectMetricsServiceImpl(@NonNull ProjectMetricsDAO projectMetricsDAO) { metricHandler = Map.of( - MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount, - MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores, - MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage, - MetricType.COST, projectMetricsDAO::getCost, - MetricType.DURATION, projectMetricsDAO::getDuration); + MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount, + MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores, + MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage, + MetricType.COST, projectMetricsDAO::getCost, + MetricType.DURATION, projectMetricsDAO::getDuration); } @Override diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java index 97a1de319b..6a77f1b6e5 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java @@ -299,6 +299,7 @@ void getProjectMetrics__whenSessionTokenIsPresent__thenReturnProperResponse( @DisplayName("Number of traces") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class NumberOfTracesTest { + @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -440,6 +441,7 @@ private void createTraces(String projectName, Instant marker, int count) { @DisplayName("Feedback scores") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class FeedbackScoresTest { + @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -530,6 +532,7 @@ private static BigDecimal calcAverage(List scores) { @DisplayName("Token usage") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class TokenUsageTest { + @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -640,6 +643,7 @@ private void getAndAssertEmpty(UUID projectId, TimeInterval interval, Instant ma @DisplayName("Cost") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class CostTest { + @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { From 2ca8f19740973b2a9aae85525909ddd1753be237 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 13 Dec 2024 12:06:51 +0100 Subject: [PATCH 05/22] Code format --- .../comet/opik/domain/ProjectMetricsService.java | 13 +++++-------- .../api/resources/v1/priv/SpansResourceTest.java | 6 ++---- .../api/resources/v1/priv/TracesResourceTest.java | 12 +++++------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java index 7c43f2bd45..5c5b0b3c72 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java @@ -31,17 +31,14 @@ class ProjectMetricsServiceImpl implements ProjectMetricsService { private final @NonNull Map>>> metricHandler; - @Inject - public ProjectMetricsServiceImpl(@NonNull Map>>> metricHandler; - @Inject public ProjectMetricsServiceImpl(@NonNull ProjectMetricsDAO projectMetricsDAO) { metricHandler = Map.of( - MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount, - MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores, - MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage, - MetricType.COST, projectMetricsDAO::getCost, - MetricType.DURATION, projectMetricsDAO::getDuration); + MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount, + MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores, + MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage, + MetricType.COST, projectMetricsDAO::getCost, + MetricType.DURATION, projectMetricsDAO::getDuration); } @Override diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index c62bca1229..2b5c224280 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -7607,8 +7607,7 @@ static Stream getSpanStats__whenFilterInvalidOperatorForFieldType__thenR .field(SpanField.DURATION) .operator(Operator.STARTS_WITH) .value("1") - .build() - ); + .build()); } @ParameterizedTest @@ -7722,8 +7721,7 @@ static Stream getSpanStats__whenFilterInvalidValueOrKeyForFieldType__the .field(SpanField.DURATION) .operator(Operator.EQUAL) .value(RandomStringUtils.randomAlphanumeric(5)) - .build() - ); + .build()); } @ParameterizedTest diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index 34ed7bf5f9..185ce44119 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -3082,8 +3082,7 @@ static Stream getTracesByProject__whenFilterInvalidOperatorForFieldType_ .field(TraceField.DURATION) .operator(Operator.NOT_CONTAINS) .value("1") - .build() - ); + .build()); } @ParameterizedTest @@ -6838,7 +6837,8 @@ Stream getTraceStats__whenFilterByDuration__thenReturnTracesFiltered( @ParameterizedTest @MethodSource - void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator operator, long end, double duration) { + void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator operator, long end, + double duration) { String workspaceName = UUID.randomUUID().toString(); String workspaceId = UUID.randomUUID().toString(); String apiKey = UUID.randomUUID().toString(); @@ -7709,8 +7709,7 @@ static Stream getTraceStats__whenFilterInvalidOperatorForFieldType__then .field(TraceField.DURATION) .operator(Operator.NOT_CONTAINS) .value("1") - .build() - ); + .build()); } @ParameterizedTest @@ -7811,8 +7810,7 @@ static Stream getTraceStats__whenFilterInvalidValueOrKeyForFieldType__th .field(TraceField.DURATION) .operator(Operator.EQUAL) .value(RandomStringUtils.randomAlphanumeric(5)) - .build() - ); + .build()); } @ParameterizedTest From 2ddca9617cf4f9f2dda66cd7b24e2050ef3136c4 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 13 Dec 2024 13:04:36 +0100 Subject: [PATCH 06/22] Fix tests --- .../src/main/java/com/comet/opik/api/Span.java | 9 --------- .../src/main/java/com/comet/opik/api/Trace.java | 8 -------- .../api/resources/v1/priv/SpansResourceTest.java | 13 +++++++++++++ .../api/resources/v1/priv/TracesResourceTest.java | 2 ++ 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java index d75842f042..2ebc3b1cd9 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Span.java @@ -1,9 +1,7 @@ package com.comet.opik.api; import com.comet.opik.domain.SpanType; -import com.comet.opik.utils.DurationUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.PropertyNamingStrategies; @@ -75,11 +73,4 @@ public static class Write { public static class Public { } } - - @JsonProperty - @JsonView({Span.View.Public.class}) - @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") - public Double duration() { - return DurationUtils.getDurationInMillisWithSubMilliPrecision(startTime, endTime); - } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java index c42a403644..d149229cb6 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java @@ -1,8 +1,6 @@ package com.comet.opik.api; -import com.comet.opik.utils.DurationUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.PropertyNamingStrategies; @@ -71,10 +69,4 @@ public static class Public { } } - @JsonProperty - @JsonView({Span.View.Public.class}) - @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") - public Double duration() { - return DurationUtils.getDurationInMillisWithSubMilliPrecision(startTime, endTime); - } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index 2b5c224280..560f458971 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -38,6 +38,7 @@ import com.comet.opik.domain.cost.ModelPrice; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.podam.PodamFactoryUtils; +import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; @@ -56,6 +57,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; +import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; @@ -3396,6 +3398,17 @@ private void assertIgnoredFields(List actualSpans, List expectedSpan assertThat(actualSpan.duration()).isEqualTo(expected, within(0.001)); } + SoftAssertions.assertSoftly(softly -> { + var expected = DurationUtils.getDurationInMillisWithSubMilliPrecision( + expectedSpan.startTime(), expectedSpan.endTime()); + + if (actualSpan.duration() == null || expected == null) { + softly.assertThat(actualSpan.duration()).isEqualTo(expected); + } else { + softly.assertThat(actualSpan.duration()).isEqualTo(expected, within(0.001)); + } + }); + if (actualSpan.feedbackScores() != null) { actualSpan.feedbackScores().forEach(feedbackScore -> { assertThat(feedbackScore.createdAt()).isAfter(expectedSpan.createdAt()); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index 185ce44119..680acdb535 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -39,6 +39,7 @@ import com.comet.opik.domain.cost.ModelPrice; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.podam.PodamFactoryUtils; +import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.uuid.Generators; @@ -53,6 +54,7 @@ import jakarta.ws.rs.core.Response; import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.HttpStatus; +import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterAll; From 5ad8cef5352697aa16bd483b1cf439a40fce09b6 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 13 Dec 2024 17:22:44 +0100 Subject: [PATCH 07/22] Fix condition --- .../db-app-analytics/migrations/000008_add_duration_columns.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql new file mode 100644 index 0000000000..e69de29bb2 From 99d87f806b194ef18816e3f45c8e582d309a4913 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Sat, 14 Dec 2024 16:51:51 +0100 Subject: [PATCH 08/22] [OPIK-287] Add project level aggregations --- .../main/java/com/comet/opik/api/Project.java | 13 +- .../com/comet/opik/domain/ProjectService.java | 54 +++- .../java/com/comet/opik/domain/TraceDAO.java | 38 ++- .../comet/opik/domain/stats/StatsMapper.java | 72 ++++- .../resources/utils/BigDecimalCollectors.java | 53 ++++ .../utils/resources/TraceResourceClient.java | 19 ++ .../v1/priv/ProjectsResourceTest.java | 280 +++++++++++++++--- 7 files changed, 472 insertions(+), 57 deletions(-) create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java index 5e8b13133b..8e39e14a98 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java @@ -11,8 +11,11 @@ import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.UUID; +import static com.comet.opik.api.ProjectStats.PercentageValues; + @Builder(toBuilder = true) @JsonIgnoreProperties(ignoreUnknown = true) // This annotation is used to specify the strategy to be used for naming of properties for the annotated type. Required so that OpenAPI schema generation uses snake_case @@ -29,7 +32,15 @@ public record Project( @JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant lastUpdatedAt, @JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String lastUpdatedBy, @JsonView({ - Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable Instant lastUpdatedTraceAt){ + Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable Instant lastUpdatedTraceAt, + @JsonView({ + Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable List feedbackScores, + @JsonView({ + Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable PercentageValues duration, + @JsonView({ + Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable Double totalEstimatedCost, + @JsonView({ + Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable Map usage){ public static class View { public static class Write { diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index 6e25544810..109096c306 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -5,6 +5,7 @@ import com.comet.opik.api.Project.ProjectPage; import com.comet.opik.api.ProjectCriteria; import com.comet.opik.api.ProjectIdLastUpdated; +import com.comet.opik.api.ProjectStats; import com.comet.opik.api.ProjectUpdate; import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; @@ -13,6 +14,7 @@ import com.comet.opik.api.sorting.SortingFactoryProjects; import com.comet.opik.api.sorting.SortingField; import com.comet.opik.domain.sorting.SortingQueryBuilder; +import com.comet.opik.domain.stats.StatsMapper; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.utils.PaginationUtils; @@ -44,6 +46,7 @@ import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY; import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE; import static java.util.Collections.reverseOrder; +import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toUnmodifiableSet; @@ -199,8 +202,19 @@ public Project get(@NonNull UUID id, @NonNull String workspaceId) { .nonTransaction(connection -> traceDAO.getLastUpdatedTraceAt(Set.of(id), workspaceId, connection)) .block(); + Map> projectStats = getProjectStats(List.of(id), workspaceId); + + return enhanceProject(project, lastUpdatedTraceAt, projectStats); + } + + private Project enhanceProject(Project project, Map lastUpdatedTraceAt, + Map> projectStats) { return project.toBuilder() - .lastUpdatedTraceAt(lastUpdatedTraceAt.get(id)) + .lastUpdatedTraceAt(lastUpdatedTraceAt.get(project.id())) + .feedbackScores(StatsMapper.getStatsFeedbackScores(projectStats.get(project.id()))) + .duration(StatsMapper.getStatsDuration(projectStats.get(project.id()))) + .totalEstimatedCost(StatsMapper.getStatsTotalEstimatedCost(projectStats.get(project.id()))) + .usage(StatsMapper.getStatsUsage(projectStats.get(project.id()))) .build(); } @@ -271,17 +285,36 @@ public Page find(int page, int size, @NonNull ProjectCriteria criteria, return traceDAO.getLastUpdatedTraceAt(projectIds, workspaceId, connection); }).block(); + List projectIds = projectRecordSet.content.stream().map(Project::id).toList(); + + Map> projectStats = getProjectStats(projectIds, workspaceId); + List projects = projectRecordSet.content() .stream() - .map(project -> project.toBuilder() - .lastUpdatedTraceAt(projectLastUpdatedTraceAtMap.get(project.id())) - .build()) + .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats)) .toList(); return new ProjectPage(page, projects.size(), projectRecordSet.total(), projects, sortingFactory.getSortableFields()); } + private Map> getProjectStats(List projectIds, String workspaceId) { + return traceDAO.getStatsByProjectIds(projectIds, workspaceId) + .map(stats -> stats.entrySet().stream() + .map(entry -> { + Map statsMap = entry.getValue() + .stats() + .stream() + .collect(toMap(ProjectStats.ProjectStatItem::getName, + ProjectStats.ProjectStatItem::getValue)); + + return Map.entry(entry.getKey(), statsMap); + }) + .map(entry -> Map.entry(entry.getKey(), entry.getValue())) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))) + .block(); + } + @Override public List findByIds(String workspaceId, Set ids) { if (ids.isEmpty()) { @@ -403,6 +436,19 @@ public Project retrieveByName(@NonNull String projectName) { return repository.findByNames(workspaceId, List.of(projectName)) .stream() .findFirst() + .map(project -> { + + Map projectLastUpdatedTraceAtMap = transactionTemplateAsync + .nonTransaction(connection -> { + Set projectIds = Set.of(project.id()); + return traceDAO.getLastUpdatedTraceAt(projectIds, workspaceId, connection); + }).block(); + + Map> projectStats = getProjectStats(List.of(project.id()), + workspaceId); + + return enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats); + }) .orElseThrow(this::createNotFoundError); }); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index 93ec17835f..f85e644320 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -86,6 +86,8 @@ interface TraceDAO { Mono getStats(TraceSearchCriteria criteria); Mono getDailyTraces(); + + Mono> getStatsByProjectIds(List projectIds, String workspaceId); } @Slf4j @@ -639,7 +641,7 @@ AND notEquals(start_time, toDateTime64('1970-01-01 00:00:00.000', 9)), if(length(metadata) > 0, 1, 0) as metadata_count, length(tags) as tags_count FROM traces - WHERE project_id = :project_id + WHERE project_id IN :project_ids AND workspace_id = :workspace_id AND @@ -651,7 +653,7 @@ AND id IN ( FROM feedback_scores WHERE entity_type = 'trace' AND workspace_id = :workspace_id - AND project_id = :project_id + AND project_id IN :project_ids ORDER BY entity_id DESC, last_updated_at DESC LIMIT 1 BY entity_id, name ) @@ -675,7 +677,7 @@ AND id IN ( total_estimated_cost FROM spans WHERE workspace_id = :workspace_id - AND project_id = :project_id + AND project_id IN :project_ids ORDER BY id DESC, last_updated_at DESC LIMIT 1 BY id ) @@ -699,7 +701,7 @@ LEFT JOIN ( total_estimated_cost FROM spans WHERE workspace_id = :workspace_id - AND project_id = :project_id + AND project_id IN :project_ids ORDER BY id DESC, last_updated_at DESC LIMIT 1 BY id ) @@ -722,7 +724,7 @@ LEFT JOIN ( FROM feedback_scores WHERE entity_type = 'trace' AND workspace_id = :workspace_id - AND project_id = :project_id + AND project_id IN :project_ids ORDER BY entity_id DESC, last_updated_at DESC LIMIT 1 BY entity_id, name ) GROUP BY project_id, entity_id @@ -1174,7 +1176,7 @@ public Mono getStats(@NonNull TraceSearchCriteria criteria) { ST statsSQL = newFindTemplate(SELECT_TRACES_STATS, criteria); var statement = connection.createStatement(statsSQL.render()) - .bind("project_id", criteria.projectId()); + .bind("project_ids", List.of(criteria.projectId()).toArray(UUID[]::new)); bindSearchCriteria(criteria, statement); @@ -1197,6 +1199,30 @@ public Mono getDailyTraces() { .reduce(0L, Long::sum); } + @Override + public Mono> getStatsByProjectIds(@NonNull List projectIds, + @NonNull String workspaceId) { + + if (projectIds.isEmpty()) { + return Mono.just(Map.of()); + } + + return asyncTemplate + .nonTransaction(connection -> { + Statement statement = connection.createStatement(new ST(SELECT_TRACES_STATS).render()) + .bind("project_ids", projectIds.toArray(UUID[]::new)) + .bind("workspace_id", workspaceId); + + return Mono.from(statement.execute()) + .flatMapMany(result -> result.map((row, rowMetadata) -> Map.of( + row.get("project_id", UUID.class), + StatsMapper.mapProjectStats(row, "trace_count")))) + .map(Map::entrySet) + .flatMap(Flux::fromIterable) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + }); + } + @Override @WithSpan public Mono> getLastUpdatedTraceAt( diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/stats/StatsMapper.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/stats/StatsMapper.java index 25ce4d1d93..2bbb24c04d 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/stats/StatsMapper.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/stats/StatsMapper.java @@ -1,5 +1,6 @@ package com.comet.opik.domain.stats; +import com.comet.opik.api.FeedbackScoreAverage; import com.comet.opik.api.ProjectStats; import io.r2dbc.spi.Row; @@ -9,48 +10,59 @@ import java.util.Optional; import java.util.stream.Stream; +import static com.comet.opik.api.ProjectStats.AvgValueStat; +import static com.comet.opik.api.ProjectStats.CountValueStat; +import static com.comet.opik.api.ProjectStats.PercentageValueStat; +import static com.comet.opik.api.ProjectStats.PercentageValues; +import static java.util.stream.Collectors.toMap; + public class StatsMapper { + public static final String USAGE = "usage"; + public static final String FEEDBACK_SCORE = "feedback_scores"; + public static final String TOTAL_ESTIMATED_COST = "total_estimated_cost"; + public static final String DURATION = "duration"; + public static ProjectStats mapProjectStats(Row row, String entityCountLabel) { var stats = Stream.>builder() - .add(new ProjectStats.CountValueStat(entityCountLabel, + .add(new CountValueStat(entityCountLabel, row.get(entityCountLabel, Long.class))) - .add(new ProjectStats.PercentageValueStat("duration", Optional - .ofNullable(row.get("duration", List.class)) - .map(durations -> new ProjectStats.PercentageValues( + .add(new PercentageValueStat(DURATION, Optional + .ofNullable(row.get(DURATION, List.class)) + .map(durations -> new PercentageValues( getP(durations, 0), getP(durations, 1), getP(durations, 2))) .orElse(null))) - .add(new ProjectStats.CountValueStat("input", row.get("input", Long.class))) - .add(new ProjectStats.CountValueStat("output", row.get("output", Long.class))) - .add(new ProjectStats.CountValueStat("metadata", row.get("metadata", Long.class))) - .add(new ProjectStats.AvgValueStat("tags", row.get("tags", Double.class))); + .add(new CountValueStat("input", row.get("input", Long.class))) + .add(new CountValueStat("output", row.get("output", Long.class))) + .add(new CountValueStat("metadata", row.get("metadata", Long.class))) + .add(new AvgValueStat("tags", row.get("tags", Double.class))); BigDecimal totalEstimatedCost = row.get("total_estimated_cost_avg", BigDecimal.class); if (totalEstimatedCost == null) { totalEstimatedCost = BigDecimal.ZERO; } - stats.add(new ProjectStats.AvgValueStat("total_estimated_cost", totalEstimatedCost.doubleValue())); + stats.add(new AvgValueStat(TOTAL_ESTIMATED_COST, totalEstimatedCost.doubleValue())); - Map usage = row.get("usage", Map.class); - Map feedbackScores = row.get("feedback_scores", Map.class); + Map usage = row.get(USAGE, Map.class); + Map feedbackScores = row.get(FEEDBACK_SCORE, Map.class); if (usage != null) { usage.keySet() .stream() .sorted() .forEach(key -> stats - .add(new ProjectStats.AvgValueStat("%s.%s".formatted("usage", key), usage.get(key)))); + .add(new AvgValueStat("%s.%s".formatted(USAGE, key), usage.get(key)))); } if (feedbackScores != null) { feedbackScores.keySet() .stream() .sorted() - .forEach(key -> stats.add(new ProjectStats.AvgValueStat("%s.%s".formatted("feedback_score", key), + .forEach(key -> stats.add(new AvgValueStat("%s.%s".formatted(FEEDBACK_SCORE, key), feedbackScores.get(key)))); } @@ -61,4 +73,38 @@ private static BigDecimal getP(List durations, int index) { return durations.get(index); } + public static Map getStatsUsage(Map stats) { + return Optional.ofNullable(stats) + .map(map -> map.keySet() + .stream() + .filter(k -> k.startsWith(USAGE)) + .map( + k -> Map.entry(k.substring("%s.".formatted(USAGE).length()), (Double) map.get(k))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))) + .orElse(null); + } + + public static Double getStatsTotalEstimatedCost(Map stats) { + return Optional.ofNullable(stats) + .map(map -> map.get(TOTAL_ESTIMATED_COST)) + .map(v -> (Double) v) + .orElse(null); + } + + public static List getStatsFeedbackScores(Map stats) { + return Optional.ofNullable(stats) + .map(map -> map.keySet() + .stream() + .filter(k -> k.startsWith(FEEDBACK_SCORE)) + .map(k -> new FeedbackScoreAverage(k.substring("%s.".formatted(FEEDBACK_SCORE).length()), + BigDecimal.valueOf((Double) map.get(k)))) + .toList()) + .orElse(null); + } + + public static PercentageValues getStatsDuration(Map stats) { + return Optional.ofNullable(stats) + .map(map -> (PercentageValues) map.get(DURATION)) + .orElse(null); + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java new file mode 100644 index 0000000000..0de3476a8b --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java @@ -0,0 +1,53 @@ +package com.comet.opik.api.resources.utils; + +import com.comet.opik.utils.ValidationUtils; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.function.Function; +import java.util.stream.Collector; + +public class BigDecimalCollectors { + + public static Collector averagingBigDecimal() { + return Collector.of( + // Supplier: Create an array with two elements to hold total and count + () -> new BigDecimal[]{BigDecimal.ZERO, BigDecimal.ZERO}, + // Accumulator: Update total and count + (result, value) -> { + result[0] = result[0].add(value); // Accumulate total + result[1] = result[1].add(BigDecimal.ONE); // Increment count + }, + // Combiner: Merge two arrays (used for parallel streams) + (result1, result2) -> { + result1[0] = result1[0].add(result2[0]); // Combine totals + result1[1] = result1[1].add(result2[1]); // Combine counts + return result1; + }, + // Finisher: Compute the average (total / count) with rounding + result -> result[1].compareTo(BigDecimal.ZERO) == 0 + ? BigDecimal.ZERO // Avoid division by zero + : result[0].divide(result[1], ValidationUtils.SCALE, RoundingMode.HALF_UP)); + } + + public static Collector averagingBigDecimal(Function mapper) { + return Collector.of( + () -> new BigDecimal[]{BigDecimal.ZERO, BigDecimal.ZERO}, + (result, value) -> { + BigDecimal mappedValue = mapper.apply(value); + if (mappedValue != null) { + result[0] = result[0].add(mappedValue); + result[1] = result[1].add(BigDecimal.ONE); + } + }, + (result1, result2) -> { + result1[0] = result1[0].add(result2[0]); + result1[1] = result1[1].add(result2[1]); + return result1; + }, + result -> result[1].compareTo(BigDecimal.ZERO) == 0 + ? BigDecimal.ZERO + : result[0].divide(result[1], ValidationUtils.SCALE, RoundingMode.HALF_UP)); + } + +} diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java index 5e05510943..92e1cd4132 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java @@ -4,10 +4,12 @@ import com.comet.opik.api.FeedbackScore; import com.comet.opik.api.FeedbackScoreBatch; import com.comet.opik.api.FeedbackScoreBatchItem; +import com.comet.opik.api.Span; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.resources.utils.TestUtils; +import com.comet.opik.domain.cost.ModelPrice; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.HttpHeaders; @@ -16,8 +18,12 @@ import org.apache.http.HttpStatus; import ru.vyarus.dropwizard.guice.test.ClientSupport; +import java.math.BigDecimal; +import java.util.AbstractMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_HEADER; import static org.assertj.core.api.Assertions.assertThat; @@ -140,4 +146,17 @@ public void updateTrace(UUID id, TraceUpdate traceUpdate, String apiKey, String assertThat(actualResponse.hasEntity()).isFalse(); } } + + public Map aggregateSpansUsage(List spans) { + return spans.stream() + .flatMap(span -> span.usage().entrySet().stream()) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), Long.valueOf(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum)); + } + + public BigDecimal aggregateSpansCost(List spans) { + return spans.stream() + .map(span -> ModelPrice.fromString(span.model()).calculateCost(span.usage())) + .reduce(BigDecimal.ZERO, BigDecimal::add); + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 6b85fc99ab..3cc21bd8ba 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -2,37 +2,46 @@ import com.comet.opik.TestComparators; import com.comet.opik.api.BatchDelete; +import com.comet.opik.api.FeedbackScore; +import com.comet.opik.api.FeedbackScoreAverage; +import com.comet.opik.api.FeedbackScoreBatchItem; import com.comet.opik.api.Project; import com.comet.opik.api.ProjectRetrieve; +import com.comet.opik.api.ProjectStats; import com.comet.opik.api.ProjectUpdate; +import com.comet.opik.api.Span; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.resources.utils.AuthTestUtils; +import com.comet.opik.api.resources.utils.BigDecimalCollectors; import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; import com.comet.opik.api.resources.utils.ClientSupportUtils; import com.comet.opik.api.resources.utils.MigrationUtils; import com.comet.opik.api.resources.utils.MySQLContainerUtils; import com.comet.opik.api.resources.utils.RedisContainerUtils; +import com.comet.opik.api.resources.utils.StatsUtils; import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils; import com.comet.opik.api.resources.utils.TestUtils; import com.comet.opik.api.resources.utils.WireMockUtils; +import com.comet.opik.api.resources.utils.resources.SpanResourceClient; import com.comet.opik.api.resources.utils.resources.TraceResourceClient; import com.comet.opik.api.sorting.Direction; import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; import com.comet.opik.api.sorting.SortingField; -import com.comet.opik.domain.ProjectService; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; +import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; +import com.comet.opik.utils.ValidationUtils; import com.github.tomakehurst.wiremock.client.WireMock; import com.redis.testcontainers.RedisContainer; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; -import org.apache.hc.core5.http.HttpStatus; +import org.apache.http.HttpStatus; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -54,17 +63,22 @@ import ru.vyarus.dropwizard.guice.test.ClientSupport; import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension; import uk.co.jemos.podam.api.PodamFactory; +import uk.co.jemos.podam.api.PodamUtils; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.time.Instant; +import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -80,6 +94,9 @@ import static com.github.tomakehurst.wiremock.client.WireMock.okJson; import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static java.util.stream.Collectors.averagingDouble; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -91,7 +108,7 @@ class ProjectsResourceTest { public static final String URL_TEMPLATE = "%s/v1/private/projects"; public static final String URL_TEMPLATE_TRACE = "%s/v1/private/traces"; public static final String[] IGNORED_FIELDS = {"createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", - "lastUpdatedTraceAt"}; + "lastUpdatedTraceAt", "feedbackScores", "duration", "totalEstimatedCost", "usage"}; private static final String API_KEY = UUID.randomUUID().toString(); private static final String USER = UUID.randomUUID().toString(); @@ -123,11 +140,11 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; - private ProjectService projectService; private TraceResourceClient traceResourceClient; + private SpanResourceClient spanResourceClient; @BeforeAll - void setUpAll(ClientSupport client, Jdbi jdbi, ProjectService projectService) throws SQLException { + void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); @@ -138,13 +155,13 @@ void setUpAll(ClientSupport client, Jdbi jdbi, ProjectService projectService) th this.baseURI = "http://localhost:%d".formatted(client.getPort()); this.client = client; - this.projectService = projectService; ClientSupportUtils.config(client); mockTargetWorkspace(API_KEY, TEST_WORKSPACE, WORKSPACE_ID); this.traceResourceClient = new TraceResourceClient(this.client, baseURI); + this.spanResourceClient = new SpanResourceClient(this.client, baseURI); } private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) { @@ -1090,10 +1107,202 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); - assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject, expectedProject2, expectedProject3)); } + @Test + @DisplayName("when projects with traces, spans, feedback scores, and usage, then return project aggregations") + void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsage__thenReturnProjectAggregations() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) + .parallelStream() + .map(project -> project.toBuilder() + .id(createProject(project, apiKey, workspaceName)) + .totalEstimatedCost(null) + .usage(null) + .feedbackScores(null) + .duration(null) + .build()) + .toList(); + + List expectedProjects = projects.parallelStream() + .map(project -> buildProjectStats(project, apiKey, workspaceName)) + .sorted(Comparator.comparing(Project::id).reversed()) + .toList(); + + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(org.apache.http.HttpStatus.SC_OK); + + assertThat(expectedProjects).hasSameSizeAs(actualEntity.content()); + + assertThat(actualEntity.content()) + .usingRecursiveComparison() + .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") + .ignoringCollectionOrder() + .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .isEqualTo(expectedProjects); + } + + @Test + @DisplayName("when projects without traces, spans, feedback scores, and usage, then return project aggregations") + void getProjects__whenProjectsHasNoTracesSpansFeedbackScoresAndUsage__thenReturnProjectAggregations() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) + .parallelStream() + .map(project -> project.toBuilder() + .id(createProject(project, apiKey, workspaceName)) + .totalEstimatedCost(null) + .usage(null) + .feedbackScores(null) + .duration(null) + .build()) + .toList(); + + List expectedProjects = projects.parallelStream() + .map(project -> project.toBuilder() + .duration(null) + .totalEstimatedCost(null) + .usage(null) + .feedbackScores(null) + .build()) + .sorted(Comparator.comparing(Project::id).reversed()) + .toList(); + + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(org.apache.http.HttpStatus.SC_OK); + + assertThat(expectedProjects).hasSameSizeAs(actualEntity.content()); + + assertThat(actualEntity.content()) + .usingRecursiveComparison() + .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") + .ignoringCollectionOrder() + .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .isEqualTo(expectedProjects); + } + + private Project buildProjectStats(Project project, String apiKey, String workspaceName) { + var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class).stream() + .map(trace -> { + Instant startTime = Instant.now(); + Instant endTime = startTime.plusMillis(PodamUtils.getIntegerInRange(1, 1000)); + return trace.toBuilder() + .projectName(project.name()) + .startTime(startTime) + .endTime(endTime) + .duration(DurationUtils.getDurationInMillisWithSubMilliPrecision(startTime, endTime)) + .build(); + }) + .toList(); + + traceResourceClient.batchCreateTraces(traces, apiKey, workspaceName); + + List scores = PodamFactoryUtils.manufacturePojoList(factory, + FeedbackScoreBatchItem.class); + + traces = traces.stream().map(trace -> { + List spans = PodamFactoryUtils.manufacturePojoList(factory, Span.class).stream() + .map(span -> span.toBuilder() + .usage(spanResourceClient.getTokenUsage()) + .model(spanResourceClient.randomModelPrice().getName()) + .traceId(trace.id()) + .projectName(trace.projectName()) + .build()) + .toList(); + + spanResourceClient.batchCreateSpans(spans, apiKey, workspaceName); + + List feedbackScores = scores.stream() + .map(feedbackScore -> feedbackScore.toBuilder() + .projectId(project.id()) + .projectName(project.name()) + .id(trace.id()) + .build()) + .toList(); + + traceResourceClient.feedbackScores(feedbackScores, apiKey, workspaceName); + + return trace.toBuilder() + .feedbackScores( + feedbackScores.stream() + .map(score -> FeedbackScore.builder() + .value(score.value()) + .name(score.name()) + .build()) + .toList()) + .usage(traceResourceClient.aggregateSpansUsage(spans)) + .totalEstimatedCost(traceResourceClient.aggregateSpansCost(spans)) + .build(); + }).toList(); + + List durations = StatsUtils.calculateQuantiles( + traces.stream() + .map(Trace::duration) + .toList(), + List.of(0.5, 0.90, 0.99)); + + return project.toBuilder() + .duration(new ProjectStats.PercentageValues(durations.get(0), durations.get(1), durations.get(2))) + .totalEstimatedCost(getTotalEstimatedCost(traces)) + .usage(traces.stream() + .map(Trace::usage) + .flatMap(usage -> usage.entrySet().stream()) + .collect(groupingBy(Map.Entry::getKey, averagingDouble(Map.Entry::getValue)))) + .feedbackScores(getScoreAverages(traces)) + .build(); + } + + private List getScoreAverages(List traces) { + return traces.stream() + .map(Trace::feedbackScores) + .flatMap(List::stream) + .collect(groupingBy(FeedbackScore::name, + BigDecimalCollectors.averagingBigDecimal(FeedbackScore::value))) + .entrySet() + .stream() + .map(entry -> FeedbackScoreAverage.builder() + .name(entry.getKey()) + .value(entry.getValue()) + .build()) + .toList(); + } + + private double getTotalEstimatedCost(List traces) { + long count = traces.stream() + .map(Trace::totalEstimatedCost) + .filter(Objects::nonNull) + .filter(cost -> cost.compareTo(BigDecimal.ZERO) > 0) + .count(); + + return traces.stream() + .map(Trace::totalEstimatedCost) + .reduce(BigDecimal.ZERO, BigDecimal::add) + .divide(BigDecimal.valueOf(count), ValidationUtils.SCALE, RoundingMode.HALF_UP).doubleValue(); + } + @Test @DisplayName("when projects is with traces created in batch, then return project with last updated trace at") void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTraceAt() { @@ -1160,7 +1369,7 @@ void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTr assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); - assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject, expectedProject2, expectedProject3)); } @@ -1190,21 +1399,35 @@ void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { Project expectedProject = project.toBuilder().id(projectId).lastUpdatedTraceAt(trace.lastUpdatedAt()) .build(); - assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject)); + assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject)); } - private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { - List dbProjects = projectService.findByIds(workspaceId, expectedProjects.stream() - .map(Project::id).collect(Collectors.toUnmodifiableSet())); - - for (Project project : expectedProjects) { - Awaitility.await().untilAsserted(() -> { - assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) - .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(TestComparators::compareMicroNanoTime) - .isEqualTo(project.lastUpdatedTraceAt()); - }); - } + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceName, String apiKey, + List expectedProjects) { + + Awaitility.await().untilAsserted(() -> { + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .queryParam("size", 100) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200); + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + + assertThat(actualEntity.size()).isEqualTo(expectedProjects.size()); + + Map actualProjectsByLastTraceAt = actualEntity.content() + .stream() + .collect(toMap(Project::id, Project::lastUpdatedTraceAt)); + + assertThat(actualProjectsByLastTraceAt) + .usingRecursiveComparison() + .withComparatorForType(TestComparators::compareMicroNanoTime, Instant.class) + .isEqualTo(expectedProjects.stream() + .collect(toMap(Project::id, Project::lastUpdatedTraceAt))); + }); } } @@ -1274,17 +1497,8 @@ private UUID createCreateTrace(String projectName, String apiKey, String workspa .projectName(projectName) .build(); - try (var actualResponse = client.target(URL_TEMPLATE_TRACE.formatted(baseURI)) - .request() - .header(HttpHeaders.AUTHORIZATION, apiKey) - .header(WORKSPACE_HEADER, workspaceName) - .post(Entity.json(trace))) { - - assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(201); - assertThat(actualResponse.hasEntity()).isFalse(); - - return TestUtils.getIdFromLocation(actualResponse.getLocation()); - } + traceResourceClient.batchCreateTraces(List.of(trace), apiKey, workspaceName); + return trace.id(); } private Trace getTrace(UUID id, String apiKey, String workspaceName) { From 4114eb06465a0a0e68c44e0c485aa119c42c8833 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 15:18:09 +0200 Subject: [PATCH 09/22] OPIK-287 sort by last updated trace at failing test --- .../v1/priv/ProjectsResourceTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 3cc21bd8ba..456e0ade36 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1155,6 +1155,57 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsage__thenReturnPr .isEqualTo(expectedProjects); } + @Test + @DisplayName("when projects with traces, spans, feedback scores, and usage and sorted by last updated trace at, then return project aggregations") + void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsageSortedLastTrace__thenReturnProjectAggregations() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) + .parallelStream() + .map(project -> project.toBuilder() + .id(createProject(project, apiKey, workspaceName)) + .totalEstimatedCost(null) + .usage(null) + .feedbackScores(null) + .duration(null) + .build()) + .toList(); + + List expectedProjects = projects.parallelStream() + .map(project -> buildProjectStats(project, apiKey, workspaceName)) + .sorted(Comparator.comparing(Project::id).reversed()) + .toList(); + + var sorting = List.of(SortingField.builder() + .field(SortableFields.LAST_UPDATED_TRACE_AT) + .direction(Direction.DESC) + .build()); + + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .queryParam("sorting", URLEncoder.encode(JsonUtils.writeValueAsString(sorting), + StandardCharsets.UTF_8)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(org.apache.http.HttpStatus.SC_OK); + + assertThat(expectedProjects).hasSameSizeAs(actualEntity.content()); + + assertThat(actualEntity.content()) + .usingRecursiveComparison() + .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") + .ignoringCollectionOrder() + .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .isEqualTo(expectedProjects); + } + @Test @DisplayName("when projects without traces, spans, feedback scores, and usage, then return project aggregations") void getProjects__whenProjectsHasNoTracesSpansFeedbackScoresAndUsage__thenReturnProjectAggregations() { From 94625d98af692c81e16e681a89f6689272de0a3a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 15:26:53 +0200 Subject: [PATCH 10/22] OPIK-287 sort by last updated trace at failing test green --- .../main/java/com/comet/opik/domain/ProjectService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index 109096c306..c5c809d068 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -361,10 +361,11 @@ private Page findWithLastTraceSorting(int page, int size, @NonNull Proj return repository.findByIds(new HashSet<>(finalIds), workspaceId); }).stream().collect(Collectors.toMap(Project::id, Function.identity())); + Map> projectStats = getProjectStats(finalIds, workspaceId); + // compose the final projects list by the correct order and add last trace to it - List projects = finalIds.stream().map(id -> projectsById.get(id).toBuilder() - .lastUpdatedTraceAt(projectLastUpdatedTraceAtMap.get(id)) - .build()) + List projects = finalIds.stream().map(projectsById::get) + .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats)) .toList(); return new ProjectPage(page, projects.size(), allProjectIdsLastUpdated.size(), projects, From cd161769bcc88cc15c631afc66cea5c6e79cbbc2 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 15:31:22 +0200 Subject: [PATCH 11/22] OPIK-287 refactor --- .../com/comet/opik/api/resources/utils/StatsUtils.java | 7 +++++++ .../resources/utils/resources/TraceResourceClient.java | 10 ---------- .../api/resources/v1/priv/ProjectsResourceTest.java | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java index cd1123e7d4..54c2e132d7 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java @@ -17,6 +17,7 @@ import java.math.RoundingMode; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -318,4 +319,10 @@ public static int bigDecimalComparator(BigDecimal v1, BigDecimal v2) { return strippedV1.toBigInteger().compareTo(strippedV2.toBigInteger()); } + public static Map aggregateSpansUsage(List spans) { + return spans.stream() + .flatMap(span -> span.usage().entrySet().stream()) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), Long.valueOf(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum)); + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java index 92e1cd4132..9966e336c0 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java @@ -19,11 +19,8 @@ import ru.vyarus.dropwizard.guice.test.ClientSupport; import java.math.BigDecimal; -import java.util.AbstractMap; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_HEADER; import static org.assertj.core.api.Assertions.assertThat; @@ -147,13 +144,6 @@ public void updateTrace(UUID id, TraceUpdate traceUpdate, String apiKey, String } } - public Map aggregateSpansUsage(List spans) { - return spans.stream() - .flatMap(span -> span.usage().entrySet().stream()) - .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), Long.valueOf(entry.getValue()))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum)); - } - public BigDecimal aggregateSpansCost(List spans) { return spans.stream() .map(span -> ModelPrice.fromString(span.model()).calculateCost(span.usage())) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 456e0ade36..459a500acc 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1304,7 +1304,7 @@ private Project buildProjectStats(Project project, String apiKey, String workspa .name(score.name()) .build()) .toList()) - .usage(traceResourceClient.aggregateSpansUsage(spans)) + .usage(StatsUtils.aggregateSpansUsage(spans)) .totalEstimatedCost(traceResourceClient.aggregateSpansCost(spans)) .build(); }).toList(); From f4511b2ce426f4643759e57c89b31fa2465904ee Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 15:49:49 +0200 Subject: [PATCH 12/22] OPIK-287 restored persisted last updated trace at assertion --- .../v1/priv/ProjectsResourceTest.java | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 459a500acc..798551fd79 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -30,6 +30,7 @@ import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; import com.comet.opik.api.sorting.SortingField; +import com.comet.opik.domain.ProjectService; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; import com.comet.opik.utils.DurationUtils; @@ -79,6 +80,7 @@ import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -140,11 +142,12 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; + private ProjectService projectService; private TraceResourceClient traceResourceClient; private SpanResourceClient spanResourceClient; @BeforeAll - void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { + void setUpAll(ClientSupport client, Jdbi jdbi, ProjectService projectService) throws SQLException { MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); @@ -155,6 +158,7 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { this.baseURI = "http://localhost:%d".formatted(client.getPort()); this.client = client; + this.projectService = projectService; ClientSupportUtils.config(client); @@ -1107,7 +1111,7 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); - assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject, expectedProject2, + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, expectedProject3)); } @@ -1420,7 +1424,7 @@ void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTr assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); - assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject, expectedProject2, + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, expectedProject3)); } @@ -1450,34 +1454,22 @@ void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { Project expectedProject = project.toBuilder().id(projectId).lastUpdatedTraceAt(trace.lastUpdatedAt()) .build(); - assertAllProjectsHavePersistedLastTraceAt(workspaceName, apiKey, List.of(expectedProject)); + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject)); } - private void assertAllProjectsHavePersistedLastTraceAt(String workspaceName, String apiKey, - List expectedProjects) { - + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { Awaitility.await().untilAsserted(() -> { - var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) - .queryParam("size", 100) - .request() - .header(HttpHeaders.AUTHORIZATION, apiKey) - .header(WORKSPACE_HEADER, workspaceName) - .get(); - - assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200); - var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); - - assertThat(actualEntity.size()).isEqualTo(expectedProjects.size()); - - Map actualProjectsByLastTraceAt = actualEntity.content() - .stream() + List dbProjects = projectService.findByIds(workspaceId, expectedProjects.stream() + .map(Project::id).collect(Collectors.toUnmodifiableSet())); + Map actualLastTraceByProjectId = dbProjects.stream() + .collect(toMap(Project::id, Project::lastUpdatedTraceAt)); + Map expectedLastTraceByProjectId = expectedProjects.stream() .collect(toMap(Project::id, Project::lastUpdatedTraceAt)); - assertThat(actualProjectsByLastTraceAt) + assertThat(actualLastTraceByProjectId) .usingRecursiveComparison() .withComparatorForType(TestComparators::compareMicroNanoTime, Instant.class) - .isEqualTo(expectedProjects.stream() - .collect(toMap(Project::id, Project::lastUpdatedTraceAt))); + .isEqualTo(expectedLastTraceByProjectId); }); } } From 495a522dbca17acd46e8897a3a8668483222ca0e Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 16:58:03 +0200 Subject: [PATCH 13/22] OPIK-287 fix tests --- .../opik/api/resources/v1/priv/ProjectsResourceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 798551fd79..e4248cec3e 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1125,7 +1125,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsage__thenReturnPr mockTargetWorkspace(apiKey, workspaceName, workspaceId); var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) - .parallelStream() + .stream() .map(project -> project.toBuilder() .id(createProject(project, apiKey, workspaceName)) .totalEstimatedCost(null) @@ -1169,7 +1169,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsageSortedLastTrac mockTargetWorkspace(apiKey, workspaceName, workspaceId); var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) - .parallelStream() + .stream() .map(project -> project.toBuilder() .id(createProject(project, apiKey, workspaceName)) .totalEstimatedCost(null) From ebef20c1610cc764887dc51d49ef4e8c103cd7fb Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 16 Dec 2024 18:09:36 +0200 Subject: [PATCH 14/22] OPIK-287 fix tests --- .../opik/api/resources/utils/StatsUtils.java | 34 +++++++++++++++++++ .../v1/priv/ProjectsResourceTest.java | 2 ++ 2 files changed, 36 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java index 54c2e132d7..08e9383a39 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java @@ -319,6 +319,40 @@ public static int bigDecimalComparator(BigDecimal v1, BigDecimal v2) { return strippedV1.toBigInteger().compareTo(strippedV2.toBigInteger()); } + public static int closeToEpsilonComparator(Object v1, Object v2) { + //TODO This is a workaround to compare averages originating from BigDecimals calculated by code vs. the same + // calculated by Clickhouse + + // Handle null cases (if nulls are allowed) + if (v1 == null && v2 == null) { + return 0; // Both null are considered equal + } else if (v1 == null) { + return -1; // Null is considered "less than" + } else if (v2 == null) { + return 1; // Non-null is considered "greater than" + } + + if (v1.equals(v2)) { + return 0; + } + + Number numv1 = (Number) v1, numv2 = (Number) v2; + + // Define an absolute tolerance for comparison + double epsilon = .00001; + + // Calculate the absolute difference + double difference = Math.abs(numv1.doubleValue() - numv2.doubleValue()); + + // If the difference is within the tolerance, consider them equal + if (difference <= epsilon) { + return 0; + } + + // otherwise return ordinary comparison + return 1; + } + public static Map aggregateSpansUsage(List spans) { return spans.stream() .flatMap(span -> span.usage().entrySet().stream()) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index e4248cec3e..b7e5a3a1f7 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1156,6 +1156,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsage__thenReturnPr .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") .ignoringCollectionOrder() .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .withComparatorForFields(StatsUtils::closeToEpsilonComparator, "totalEstimatedCost") .isEqualTo(expectedProjects); } @@ -1256,6 +1257,7 @@ void getProjects__whenProjectsHasNoTracesSpansFeedbackScoresAndUsage__thenReturn .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") .ignoringCollectionOrder() .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .withComparatorForFields(StatsUtils::closeToEpsilonComparator, "totalEstimatedCost") .isEqualTo(expectedProjects); } From ef8cc20961a6be77e89dded9f165fca33eefb9a0 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 17 Dec 2024 11:12:52 +0200 Subject: [PATCH 15/22] OPIK-287 fix tests --- .../comet/opik/api/resources/v1/priv/ProjectsResourceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index b7e5a3a1f7..3b08617adf 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1208,6 +1208,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsageSortedLastTrac .ignoringFields("createdBy", "lastUpdatedBy", "createdAt", "lastUpdatedAt", "lastUpdatedTraceAt") .ignoringCollectionOrder() .withComparatorForType(StatsUtils::bigDecimalComparator, BigDecimal.class) + .withComparatorForFields(StatsUtils::closeToEpsilonComparator, "totalEstimatedCost") .isEqualTo(expectedProjects); } From d47beacd00876314434b0b70574688805e688d8f Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 17 Dec 2024 11:13:20 +0200 Subject: [PATCH 16/22] Revert "OPIK-287 fix tests" This reverts commit d9a1f683cfad7adbaa4e6bfd3fc9e2054ac68502. --- .../opik/api/resources/v1/priv/ProjectsResourceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 3b08617adf..0a6917a334 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1125,7 +1125,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsage__thenReturnPr mockTargetWorkspace(apiKey, workspaceName, workspaceId); var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) - .stream() + .parallelStream() .map(project -> project.toBuilder() .id(createProject(project, apiKey, workspaceName)) .totalEstimatedCost(null) @@ -1170,7 +1170,7 @@ void getProjects__whenProjectsHasTracesSpansFeedbackScoresAndUsageSortedLastTrac mockTargetWorkspace(apiKey, workspaceName, workspaceId); var projects = PodamFactoryUtils.manufacturePojoList(factory, Project.class) - .stream() + .parallelStream() .map(project -> project.toBuilder() .id(createProject(project, apiKey, workspaceName)) .totalEstimatedCost(null) From 7295cf4fd5e131996e531dd567799c1ff23cee5a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 18 Dec 2024 18:03:34 +0200 Subject: [PATCH 17/22] OPIK-287 fix logical conflicts after rebase --- .../000008_add_duration_columns.sql | 0 .../resources/v1/priv/SpansResourceTest.java | 13 --- .../resources/v1/priv/TracesResourceTest.java | 79 ------------------- 3 files changed, 92 deletions(-) delete mode 100644 apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000008_add_duration_columns.sql deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java index 560f458971..2b5c224280 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/SpansResourceTest.java @@ -38,7 +38,6 @@ import com.comet.opik.domain.cost.ModelPrice; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.podam.PodamFactoryUtils; -import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; @@ -57,7 +56,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; -import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; @@ -3398,17 +3396,6 @@ private void assertIgnoredFields(List actualSpans, List expectedSpan assertThat(actualSpan.duration()).isEqualTo(expected, within(0.001)); } - SoftAssertions.assertSoftly(softly -> { - var expected = DurationUtils.getDurationInMillisWithSubMilliPrecision( - expectedSpan.startTime(), expectedSpan.endTime()); - - if (actualSpan.duration() == null || expected == null) { - softly.assertThat(actualSpan.duration()).isEqualTo(expected); - } else { - softly.assertThat(actualSpan.duration()).isEqualTo(expected, within(0.001)); - } - }); - if (actualSpan.feedbackScores() != null) { actualSpan.feedbackScores().forEach(feedbackScore -> { assertThat(feedbackScore.createdAt()).isAfter(expectedSpan.createdAt()); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index 680acdb535..bf563902d0 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -39,7 +39,6 @@ import com.comet.opik.domain.cost.ModelPrice; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.podam.PodamFactoryUtils; -import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.uuid.Generators; @@ -54,7 +53,6 @@ import jakarta.ws.rs.core.Response; import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.HttpStatus; -import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterAll; @@ -6819,83 +6817,6 @@ void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator oper getStatsAndAssert(projectName, null, filters, apiKey, workspaceName, expectedStats); } - Stream getTraceStats__whenFilterByDuration__thenReturnTracesFiltered() { - return Stream.of( - arguments(Operator.EQUAL, - Duration.ofMillis(1L).toNanos() / 1000, 1.0), - arguments(Operator.GREATER_THAN, - Duration.ofMillis(8L).toNanos() / 1000, 7.0), - arguments(Operator.GREATER_THAN_EQUAL, - Duration.ofMillis(1L).toNanos() / 1000, 1.0), - arguments(Operator.GREATER_THAN_EQUAL, - Duration.ofMillis(1L).plusNanos(1000).toNanos() / 1000, 1.0), - arguments(Operator.LESS_THAN, - Duration.ofMillis(1L).plusNanos(1).toNanos() / 1000, 2.0), - arguments(Operator.LESS_THAN_EQUAL, - Duration.ofMillis(1L).toNanos() / 1000, 1.0), - arguments(Operator.LESS_THAN_EQUAL, - Duration.ofMillis(1L).toNanos() / 1000, 2.0)); - } - - @ParameterizedTest - @MethodSource - void getTraceStats__whenFilterByDuration__thenReturnTracesFiltered(Operator operator, long end, - double duration) { - String workspaceName = UUID.randomUUID().toString(); - String workspaceId = UUID.randomUUID().toString(); - String apiKey = UUID.randomUUID().toString(); - - mockTargetWorkspace(apiKey, workspaceName, workspaceId); - - var projectName = generator.generate().toString(); - var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class) - .stream() - .map(trace -> { - Instant now = Instant.now(); - return trace.toBuilder() - .projectId(null) - .usage(null) - .projectName(projectName) - .feedbackScores(null) - .totalEstimatedCost(BigDecimal.ZERO) - .startTime(now) - .endTime(Set.of(Operator.LESS_THAN, Operator.LESS_THAN_EQUAL).contains(operator) - ? Instant.now().plusSeconds(2) - : now.plusNanos(1000)) - .build(); - }) - .collect(Collectors.toCollection(ArrayList::new)); - - var start = Instant.now().truncatedTo(ChronoUnit.MILLIS); - traces.set(0, traces.getFirst().toBuilder() - .startTime(start) - .endTime(start.plus(end, ChronoUnit.MICROS)) - .build()); - - traces.forEach(expectedTrace -> create(expectedTrace, apiKey, workspaceName)); - - var expectedTraces = List.of(traces.getFirst()); - - var unexpectedTraces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class).stream() - .map(span -> span.toBuilder() - .projectId(null) - .build()) - .toList(); - - unexpectedTraces.forEach(expectedTrace -> create(expectedTrace, apiKey, workspaceName)); - - var filters = List.of( - TraceFilter.builder() - .field(TraceField.DURATION) - .operator(operator) - .value(String.valueOf(duration)) - .build()); - - var expectedStats = getProjectTraceStatItems(expectedTraces); - - getStatsAndAssert(projectName, null, filters, apiKey, workspaceName, expectedStats); - } - private void getStatsAndAssert(String projectName, UUID projectId, List filters, String apiKey, String workspaceName, List> expectedStats) { WebTarget webTarget = client.target(URL_TEMPLATE.formatted(baseURI)) From e173946da2a27461c42b5ec612850678935a778a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 18 Dec 2024 18:11:07 +0200 Subject: [PATCH 18/22] OPIK-287 cleanup --- .../main/java/com/comet/opik/api/Trace.java | 1 - .../comet/opik/domain/ProjectMetricsDAO.java | 1 - .../opik/domain/ProjectMetricsService.java | 2 -- .../com/comet/opik/utils/DurationUtils.java | 23 ------------------- .../v1/priv/ProjectMetricsResourceTest.java | 4 ---- .../v1/priv/ProjectsResourceTest.java | 2 +- 6 files changed, 1 insertion(+), 32 deletions(-) delete mode 100644 apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java index d149229cb6..b5f04a696a 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Trace.java @@ -68,5 +68,4 @@ public static class Write { public static class Public { } } - } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java index 4ff0ab041d..cbeb50af85 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsDAO.java @@ -56,7 +56,6 @@ record Entry(String name, Instant time, Number value) { @Singleton @RequiredArgsConstructor(onConstructor_ = @Inject) class ProjectMetricsDAOImpl implements ProjectMetricsDAO { - private final @NonNull TransactionTemplateAsync template; private static final Map INTERVAL_TO_SQL = Map.of( diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java index 5c5b0b3c72..6e125ad4ee 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectMetricsService.java @@ -20,7 +20,6 @@ @ImplementedBy(ProjectMetricsServiceImpl.class) public interface ProjectMetricsService { String ERR_START_BEFORE_END = "'start_time' must be before 'end_time'"; - String ERR_PROJECT_METRIC_NOT_SUPPORTED = "metric '%s' is not supported"; Mono> getProjectMetrics(UUID projectId, ProjectMetricRequest request); } @@ -28,7 +27,6 @@ public interface ProjectMetricsService { @Slf4j @Singleton class ProjectMetricsServiceImpl implements ProjectMetricsService { - private final @NonNull Map>>> metricHandler; @Inject diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java deleted file mode 100644 index 74ddd85010..0000000000 --- a/apps/opik-backend/src/main/java/com/comet/opik/utils/DurationUtils.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.comet.opik.utils; - -import lombok.NonNull; -import lombok.experimental.UtilityClass; - -import java.time.Duration; -import java.time.Instant; - -@UtilityClass -public class DurationUtils { - - public static final Double TIME_UNIT = 1_000.0; - - public static Double getDurationInMillisWithSubMilliPrecision(@NonNull Instant startTime, Instant endTime) { - if (endTime == null) { - return null; - } - - long micros = Duration.between(startTime, endTime).toNanos() / TIME_UNIT.longValue(); - return micros / TIME_UNIT; - } - -} diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java index 6a77f1b6e5..97a1de319b 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectMetricsResourceTest.java @@ -299,7 +299,6 @@ void getProjectMetrics__whenSessionTokenIsPresent__thenReturnProperResponse( @DisplayName("Number of traces") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class NumberOfTracesTest { - @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -441,7 +440,6 @@ private void createTraces(String projectName, Instant marker, int count) { @DisplayName("Feedback scores") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class FeedbackScoresTest { - @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -532,7 +530,6 @@ private static BigDecimal calcAverage(List scores) { @DisplayName("Token usage") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class TokenUsageTest { - @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { @@ -643,7 +640,6 @@ private void getAndAssertEmpty(UUID projectId, TimeInterval interval, Instant ma @DisplayName("Cost") @TestInstance(TestInstance.Lifecycle.PER_CLASS) class CostTest { - @ParameterizedTest @EnumSource(TimeInterval.class) void happyPath(TimeInterval interval) { diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 0a6917a334..50f30ee59f 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -17,6 +17,7 @@ import com.comet.opik.api.resources.utils.BigDecimalCollectors; import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; import com.comet.opik.api.resources.utils.ClientSupportUtils; +import com.comet.opik.api.resources.utils.DurationUtils; import com.comet.opik.api.resources.utils.MigrationUtils; import com.comet.opik.api.resources.utils.MySQLContainerUtils; import com.comet.opik.api.resources.utils.RedisContainerUtils; @@ -33,7 +34,6 @@ import com.comet.opik.domain.ProjectService; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; -import com.comet.opik.utils.DurationUtils; import com.comet.opik.utils.JsonUtils; import com.comet.opik.utils.ValidationUtils; import com.github.tomakehurst.wiremock.client.WireMock; From ab0f8bb5074e4a12bde2b6a5af5ad161d28f1088 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 18 Dec 2024 18:18:13 +0200 Subject: [PATCH 19/22] OPIK-287 refactor --- .../com/comet/opik/api/resources/utils/StatsUtils.java | 6 ++++++ .../resources/utils/resources/TraceResourceClient.java | 9 --------- .../opik/api/resources/v1/priv/ProjectsResourceTest.java | 2 +- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java index 08e9383a39..c120fad665 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/StatsUtils.java @@ -359,4 +359,10 @@ public static Map aggregateSpansUsage(List spans) { .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), Long.valueOf(entry.getValue()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum)); } + + public static BigDecimal aggregateSpansCost(List spans) { + return spans.stream() + .map(span -> ModelPrice.fromString(span.model()).calculateCost(span.usage())) + .reduce(BigDecimal.ZERO, BigDecimal::add); + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java index 9966e336c0..5e05510943 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java @@ -4,12 +4,10 @@ import com.comet.opik.api.FeedbackScore; import com.comet.opik.api.FeedbackScoreBatch; import com.comet.opik.api.FeedbackScoreBatchItem; -import com.comet.opik.api.Span; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.resources.utils.TestUtils; -import com.comet.opik.domain.cost.ModelPrice; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.HttpHeaders; @@ -18,7 +16,6 @@ import org.apache.http.HttpStatus; import ru.vyarus.dropwizard.guice.test.ClientSupport; -import java.math.BigDecimal; import java.util.List; import java.util.UUID; @@ -143,10 +140,4 @@ public void updateTrace(UUID id, TraceUpdate traceUpdate, String apiKey, String assertThat(actualResponse.hasEntity()).isFalse(); } } - - public BigDecimal aggregateSpansCost(List spans) { - return spans.stream() - .map(span -> ModelPrice.fromString(span.model()).calculateCost(span.usage())) - .reduce(BigDecimal.ZERO, BigDecimal::add); - } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 50f30ee59f..7c093bd7cc 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1312,7 +1312,7 @@ private Project buildProjectStats(Project project, String apiKey, String workspa .build()) .toList()) .usage(StatsUtils.aggregateSpansUsage(spans)) - .totalEstimatedCost(traceResourceClient.aggregateSpansCost(spans)) + .totalEstimatedCost(StatsUtils.aggregateSpansCost(spans)) .build(); }).toList(); From 363b049f01847869d209931304044420fe1af117 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 23 Dec 2024 11:47:07 +0200 Subject: [PATCH 20/22] OPIK-287 PR comments --- .../src/main/java/com/comet/opik/domain/TraceDAO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index f85e644320..6a174dce63 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -1176,7 +1176,7 @@ public Mono getStats(@NonNull TraceSearchCriteria criteria) { ST statsSQL = newFindTemplate(SELECT_TRACES_STATS, criteria); var statement = connection.createStatement(statsSQL.render()) - .bind("project_ids", List.of(criteria.projectId()).toArray(UUID[]::new)); + .bind("project_ids", List.of(criteria.projectId())); bindSearchCriteria(criteria, statement); @@ -1210,7 +1210,7 @@ public Mono> getStatsByProjectIds(@NonNull List pr return asyncTemplate .nonTransaction(connection -> { Statement statement = connection.createStatement(new ST(SELECT_TRACES_STATS).render()) - .bind("project_ids", projectIds.toArray(UUID[]::new)) + .bind("project_ids", projectIds) .bind("workspace_id", workspaceId); return Mono.from(statement.execute()) From 7d0cdc9e5f66882c1033041edd6a283b9971d9ad Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 23 Dec 2024 11:51:54 +0200 Subject: [PATCH 21/22] OPIK-287 PR comments --- .../com/comet/opik/domain/ProjectService.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index c5c809d068..45ab2d6108 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -204,17 +204,16 @@ public Project get(@NonNull UUID id, @NonNull String workspaceId) { Map> projectStats = getProjectStats(List.of(id), workspaceId); - return enhanceProject(project, lastUpdatedTraceAt, projectStats); + return enhanceProject(project, lastUpdatedTraceAt.get(project.id()), projectStats.get(project.id())); } - private Project enhanceProject(Project project, Map lastUpdatedTraceAt, - Map> projectStats) { + private Project enhanceProject(Project project, Instant lastUpdatedTraceAt, Map projectStats) { return project.toBuilder() - .lastUpdatedTraceAt(lastUpdatedTraceAt.get(project.id())) - .feedbackScores(StatsMapper.getStatsFeedbackScores(projectStats.get(project.id()))) - .duration(StatsMapper.getStatsDuration(projectStats.get(project.id()))) - .totalEstimatedCost(StatsMapper.getStatsTotalEstimatedCost(projectStats.get(project.id()))) - .usage(StatsMapper.getStatsUsage(projectStats.get(project.id()))) + .lastUpdatedTraceAt(lastUpdatedTraceAt) + .feedbackScores(StatsMapper.getStatsFeedbackScores(projectStats)) + .duration(StatsMapper.getStatsDuration(projectStats)) + .totalEstimatedCost(StatsMapper.getStatsTotalEstimatedCost(projectStats)) + .usage(StatsMapper.getStatsUsage(projectStats)) .build(); } @@ -291,7 +290,8 @@ public Page find(int page, int size, @NonNull ProjectCriteria criteria, List projects = projectRecordSet.content() .stream() - .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats)) + .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap.get(project.id()), + projectStats.get(project.id()))) .toList(); return new ProjectPage(page, projects.size(), projectRecordSet.total(), projects, @@ -365,7 +365,8 @@ private Page findWithLastTraceSorting(int page, int size, @NonNull Proj // compose the final projects list by the correct order and add last trace to it List projects = finalIds.stream().map(projectsById::get) - .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats)) + .map(project -> enhanceProject(project, projectLastUpdatedTraceAtMap.get(project.id()), + projectStats.get(project.id()))) .toList(); return new ProjectPage(page, projects.size(), allProjectIdsLastUpdated.size(), projects, @@ -448,7 +449,8 @@ public Project retrieveByName(@NonNull String projectName) { Map> projectStats = getProjectStats(List.of(project.id()), workspaceId); - return enhanceProject(project, projectLastUpdatedTraceAtMap, projectStats); + return enhanceProject(project, projectLastUpdatedTraceAtMap.get(project.id()), + projectStats.get(project.id())); }) .orElseThrow(this::createNotFoundError); }); From 7a8905ce15d0b3dff9547e5015845ded88216f8a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Mon, 23 Dec 2024 11:53:41 +0200 Subject: [PATCH 22/22] OPIK-287 PR comments --- .../resources/utils/BigDecimalCollectors.java | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java index 0de3476a8b..c34e7250e5 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/BigDecimalCollectors.java @@ -8,28 +8,6 @@ import java.util.stream.Collector; public class BigDecimalCollectors { - - public static Collector averagingBigDecimal() { - return Collector.of( - // Supplier: Create an array with two elements to hold total and count - () -> new BigDecimal[]{BigDecimal.ZERO, BigDecimal.ZERO}, - // Accumulator: Update total and count - (result, value) -> { - result[0] = result[0].add(value); // Accumulate total - result[1] = result[1].add(BigDecimal.ONE); // Increment count - }, - // Combiner: Merge two arrays (used for parallel streams) - (result1, result2) -> { - result1[0] = result1[0].add(result2[0]); // Combine totals - result1[1] = result1[1].add(result2[1]); // Combine counts - return result1; - }, - // Finisher: Compute the average (total / count) with rounding - result -> result[1].compareTo(BigDecimal.ZERO) == 0 - ? BigDecimal.ZERO // Avoid division by zero - : result[0].divide(result[1], ValidationUtils.SCALE, RoundingMode.HALF_UP)); - } - public static Collector averagingBigDecimal(Function mapper) { return Collector.of( () -> new BigDecimal[]{BigDecimal.ZERO, BigDecimal.ZERO}, @@ -49,5 +27,4 @@ public class BigDecimalCollectors { ? BigDecimal.ZERO : result[0].divide(result[1], ValidationUtils.SCALE, RoundingMode.HALF_UP)); } - }