Skip to content

Commit

Permalink
[OPIK-446] Add duration to trace and span (#883)
Browse files Browse the repository at this point in the history
* [OPIK-446] Add durartion to trace_span

* Fix calc

* Fix pr review

* OPIK-446 remove spans calculated duration

* OPIK-446 remove traces calculated duration

* OPIK-446 changed DurationUtils to be a test utility

* OPIK-446 changed query builder to use the materialized column

---------

Co-authored-by: Ido Berkovich <[email protected]>
  • Loading branch information
thiagohora and idoberko2 authored Dec 18, 2024
1 parent b3e9276 commit a012ea6
Show file tree
Hide file tree
Showing 15 changed files with 670 additions and 170 deletions.
4 changes: 3 additions & 1 deletion apps/opik-backend/src/main/java/com/comet/opik/api/Span.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public record Span(
@JsonView({
Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) List<FeedbackScore> feedbackScores,
@JsonView({
Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) BigDecimal totalEstimatedCost){
Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) BigDecimal totalEstimatedCost,
@JsonView({
Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") Double duration){

public record SpanPage(
@JsonView(Span.View.Public.class) int page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public record Trace(
@JsonView({
Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) List<FeedbackScore> feedbackScores,
@JsonView({
Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) BigDecimal totalEstimatedCost){
Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) BigDecimal totalEstimatedCost,
@JsonView({
Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY, description = "Duration in milliseconds as a decimal number to support sub-millisecond precision") Double duration){

public record TracePage(
@JsonView(Trace.View.Public.class) int page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public interface Field {
String USAGE_PROMPT_TOKENS_QUERY_PARAM = "usage.prompt_tokens";
String USAGE_TOTAL_TOKEN_QUERY_PARAMS = "usage.total_tokens";
String FEEDBACK_SCORES_QUERY_PARAM = "feedback_scores";
String DURATION_QUERY_PARAM = "duration";

@JsonValue
String getQueryParamField();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public enum SpanField implements Field {
USAGE_PROMPT_TOKENS(USAGE_PROMPT_TOKENS_QUERY_PARAM, FieldType.NUMBER),
USAGE_TOTAL_TOKENS(USAGE_TOTAL_TOKEN_QUERY_PARAMS, FieldType.NUMBER),
FEEDBACK_SCORES(FEEDBACK_SCORES_QUERY_PARAM, FieldType.FEEDBACK_SCORES_NUMBER),
;
DURATION(DURATION_QUERY_PARAM, FieldType.NUMBER);

private final String queryParamField;
private final FieldType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public enum TraceField implements Field {
USAGE_PROMPT_TOKENS(USAGE_PROMPT_TOKENS_QUERY_PARAM, FieldType.NUMBER),
USAGE_TOTAL_TOKENS(USAGE_TOTAL_TOKEN_QUERY_PARAMS, FieldType.NUMBER),
FEEDBACK_SCORES(FEEDBACK_SCORES_QUERY_PARAM, FieldType.FEEDBACK_SCORES_NUMBER),
;
DURATION(DURATION_QUERY_PARAM, FieldType.NUMBER);

private final String queryParamField;
private final FieldType type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class SpansResource {
@Operation(operationId = "getSpansByProject", summary = "Get spans by project_name or project_id and optionally by trace_id and/or type", description = "Get spans by project_name or project_id and optionally by trace_id and/or type", responses = {
@ApiResponse(responseCode = "200", description = "Spans resource", content = @Content(schema = @Schema(implementation = SpanPage.class)))})
@JsonView(Span.View.Public.class)
public Response getByProjectId(
public Response getSpansByProject(
@QueryParam("page") @Min(1) @DefaultValue("1") int page,
@QueryParam("size") @Min(1) @DefaultValue("10") int size,
@QueryParam("project_name") String projectName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class TracesResource {
@Operation(operationId = "getTracesByProject", summary = "Get traces by project_name or project_id", description = "Get traces by project_name or project_id", responses = {
@ApiResponse(responseCode = "200", description = "Trace resource", content = @Content(schema = @Schema(implementation = TracePage.class)))})
@JsonView(Trace.View.Public.class)
public Response getByProjectId(
public Response getTracesByProject(
@QueryParam("page") @Min(1) @DefaultValue("1") int page,
@QueryParam("size") @Min(1) @DefaultValue("10") int size,
@QueryParam("project_name") String projectName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ LEFT JOIN (

private static final String SELECT_BY_ID = """
SELECT
*
*,
duration_millis
FROM
spans
WHERE id = :id
Expand Down Expand Up @@ -511,7 +512,8 @@ LEFT JOIN (
created_at,
last_updated_at,
created_by,
last_updated_by
last_updated_by,
duration_millis
FROM spans
WHERE project_id = :project_id
AND workspace_id = :workspace_id
Expand Down Expand Up @@ -607,7 +609,7 @@ AND id in (
SELECT
project_id as project_id,
count(DISTINCT span_id) as span_count,
arrayMap(v -> if(isNaN(v), 0, toDecimal64(v / 1000.0, 9)), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
arrayMap(v -> toDecimal64(if(isNaN(v), 0, v), 9), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
sum(input_count) as input,
sum(output_count) as output,
sum(metadata_count) as metadata,
Expand All @@ -621,7 +623,7 @@ AND id in (
s.workspace_id as workspace_id,
s.project_id as project_id,
s.id as span_id,
s.duration as duration,
s.duration_millis as duration,
s.input_count as input_count,
s.output_count as output_count,
s.metadata_count as metadata_count,
Expand All @@ -634,7 +636,7 @@ AND id in (
workspace_id,
project_id,
id,
if(end_time IS NOT NULL, date_diff('microsecond', start_time, end_time), null) as duration,
duration_millis,
if(length(input) > 0, 1, 0) as input_count,
if(length(output) > 0, 1, 0) as output_count,
if(length(metadata) > 0, 1, 0) as metadata_count,
Expand Down Expand Up @@ -1094,6 +1096,7 @@ private Publisher<Span> mapToDto(Result result) {
.lastUpdatedAt(row.get("last_updated_at", Instant.class))
.createdBy(row.get("created_by", String.class))
.lastUpdatedBy(row.get("last_updated_by", String.class))
.duration(row.get("duration_millis", Double.class))
.build();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ public interface SpanMapper {
void updateSpanModelBuilder(@MappingTarget SpanModel.SpanModelBuilder spanModelBuilder, SpanUpdate spanUpdate);

@BeanMapping(nullValuePropertyMappingStrategy = NullValuePropertyMappingStrategy.IGNORE)
@Mapping(target = "duration", ignore = true)
void updateSpanBuilder(@MappingTarget Span.SpanBuilder spanBuilder, SpanUpdate spanUpdate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,13 @@ INSERT INTO traces (
private static final String SELECT_BY_ID = """
SELECT
t.*,
t.duration_millis,
sumMap(s.usage) as usage,
sum(s.total_estimated_cost) as total_estimated_cost
FROM (
SELECT
*
*,
duration_millis
FROM traces
WHERE workspace_id = :workspace_id
AND id = :id
Expand All @@ -290,14 +292,16 @@ LEFT JOIN (
LIMIT 1 BY id
) AS s ON t.id = s.trace_id
GROUP BY
t.*
t.*,
duration_millis
ORDER BY t.id DESC
;
""";

private static final String SELECT_BY_PROJECT_ID = """
SELECT
t.*,
t.duration_millis,
sumMap(s.usage) as usage,
sum(s.total_estimated_cost) as total_estimated_cost
FROM (
Expand All @@ -316,7 +320,8 @@ LEFT JOIN (
created_at,
last_updated_at,
created_by,
last_updated_by
last_updated_by,
duration_millis
FROM traces
WHERE project_id = :project_id
AND workspace_id = :workspace_id
Expand Down Expand Up @@ -354,7 +359,8 @@ LEFT JOIN (
LIMIT 1 BY id
) AS s ON t.id = s.trace_id
GROUP BY
t.*
t.*,
t.duration_millis
<if(trace_aggregation_filters)>
HAVING <trace_aggregation_filters>
<endif>
Expand Down Expand Up @@ -587,7 +593,7 @@ LEFT JOIN (
SELECT
project_id as project_id,
count(DISTINCT trace_id) as trace_count,
arrayMap(v -> if(isNaN(v), 0, toDecimal64(v / 1000.0, 9)), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
arrayMap(v -> toDecimal64(if(isNaN(v), 0, v), 9), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
sum(input_count) as input,
sum(output_count) as output,
sum(metadata_count) as metadata,
Expand All @@ -601,7 +607,7 @@ LEFT JOIN (
t.workspace_id as workspace_id,
t.project_id as project_id,
t.id as trace_id,
t.duration as duration,
t.duration_millis as duration,
t.input_count as input_count,
t.output_count as output_count,
t.metadata_count as metadata_count,
Expand All @@ -614,7 +620,7 @@ LEFT JOIN (
workspace_id,
project_id,
id,
if(end_time IS NOT NULL, date_diff('microsecond', start_time, end_time), null) as duration,
duration_millis,
if(length(input) > 0, 1, 0) as input_count,
if(length(output) > 0, 1, 0) as output_count,
if(length(metadata) > 0, 1, 0) as metadata_count,
Expand Down Expand Up @@ -934,6 +940,7 @@ private Publisher<Trace> mapToDto(Result result) {
.lastUpdatedAt(row.get("last_updated_at", Instant.class))
.createdBy(row.get("created_by", String.class))
.lastUpdatedBy(row.get("last_updated_by", String.class))
.duration(row.get("duration_millis", Double.class))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FilterQueryBuilder {
private static final String USAGE_PROMPT_TOKENS_ANALYTICS_DB = "usage['prompt_tokens']";
private static final String USAGE_TOTAL_TOKENS_ANALYTICS_DB = "usage['total_tokens']";
private static final String VALUE_ANALYTICS_DB = "value";
private static final String DURATION_ANALYTICS_DB = "duration_millis";

private static final Map<Operator, Map<FieldType, String>> ANALYTICS_DB_OPERATOR_MAP = new EnumMap<>(Map.of(
Operator.CONTAINS, new EnumMap<>(Map.of(
Expand Down Expand Up @@ -112,6 +113,7 @@ public class FilterQueryBuilder {
.put(TraceField.USAGE_PROMPT_TOKENS, USAGE_PROMPT_TOKENS_ANALYTICS_DB)
.put(TraceField.USAGE_TOTAL_TOKENS, USAGE_TOTAL_TOKENS_ANALYTICS_DB)
.put(TraceField.FEEDBACK_SCORES, VALUE_ANALYTICS_DB)
.put(TraceField.DURATION, DURATION_ANALYTICS_DB)
.build());

private static final Map<SpanField, String> SPAN_FIELDS_MAP = new EnumMap<>(
Expand All @@ -131,6 +133,7 @@ public class FilterQueryBuilder {
.put(SpanField.USAGE_PROMPT_TOKENS, USAGE_PROMPT_TOKENS_ANALYTICS_DB)
.put(SpanField.USAGE_TOTAL_TOKENS, USAGE_TOTAL_TOKENS_ANALYTICS_DB)
.put(SpanField.FEEDBACK_SCORES, VALUE_ANALYTICS_DB)
.put(SpanField.DURATION, DURATION_ANALYTICS_DB)
.build());

private static final Map<ExperimentsComparisonValidKnownField, String> EXPERIMENTS_COMPARISON_FIELDS_MAP = new EnumMap<>(
Expand All @@ -149,6 +152,7 @@ public class FilterQueryBuilder {
.add(TraceField.OUTPUT)
.add(TraceField.METADATA)
.add(TraceField.TAGS)
.add(TraceField.DURATION)
.build()),
FilterStrategy.TRACE_AGGREGATION, EnumSet.copyOf(ImmutableSet.<TraceField>builder()
.add(TraceField.USAGE_COMPLETION_TOKENS)
Expand All @@ -171,6 +175,7 @@ public class FilterQueryBuilder {
.add(SpanField.USAGE_COMPLETION_TOKENS)
.add(SpanField.USAGE_PROMPT_TOKENS)
.add(SpanField.USAGE_TOTAL_TOKENS)
.add(SpanField.DURATION)
.build()),
FilterStrategy.FEEDBACK_SCORES, ImmutableSet.<Field>builder()
.add(TraceField.FEEDBACK_SCORES)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--liquibase formatted sql
--changeset idoberko2:add_duration_columns

ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.spans
ADD COLUMN IF NOT EXISTS duration_millis Nullable(Float64) MATERIALIZED
if(end_time IS NOT NULL AND start_time IS NOT NULL
AND notEquals(start_time, toDateTime64('1970-01-01 00:00:00.000', 9)),
(dateDiff('microsecond', start_time, end_time) / 1000.0),
NULL);

ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.traces
ADD COLUMN IF NOT EXISTS duration_millis Nullable(Float64) MATERIALIZED
if(end_time IS NOT NULL AND start_time IS NOT NULL
AND notEquals(start_time, toDateTime64('1970-01-01 00:00:00.000', 9)),
(dateDiff('microsecond', start_time, end_time) / 1000.0),
NULL);

--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.spans DROP COLUMN IF EXISTS duration_millis;
--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.traces DROP COLUMN IF EXISTS duration_millis;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.comet.opik.api.resources.utils;

import lombok.NonNull;
import lombok.experimental.UtilityClass;

import java.time.Duration;
import java.time.Instant;

@UtilityClass
public class DurationUtils {

public static final Double TIME_UNIT = 1_000.0;

public static Double getDurationInMillisWithSubMilliPrecision(@NonNull Instant startTime, Instant endTime) {
if (endTime == null) {
return null;
}

long micros = Duration.between(startTime, endTime).toNanos() / TIME_UNIT.longValue();
return micros / TIME_UNIT;
}

}
Loading

0 comments on commit a012ea6

Please sign in to comment.