Skip to content

Commit

Permalink
[OPIK-446] Add duration metric
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Dec 13, 2024
1 parent 57dd59b commit 19bdb8b
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono;
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment;
Expand All @@ -35,11 +36,16 @@
public interface ProjectMetricsDAO {
String NAME_TRACES = "traces";
String NAME_COST = "cost";
String NAME_DURATION_P50 = "duration.p50";
String NAME_DURATION_P90 = "duration.p90";
String NAME_DURATION_P99 = "duration.p99";

@Builder
record Entry(String name, Instant time, Number value) {

}

Mono<List<Entry>> getDuration(UUID projectId, ProjectMetricRequest request);
Mono<List<Entry>> getTraceCount(@NonNull UUID projectId, @NonNull ProjectMetricRequest request);
Mono<List<Entry>> getFeedbackScores(@NonNull UUID projectId, @NonNull ProjectMetricRequest request);
Mono<List<Entry>> getTokenUsage(@NonNull UUID projectId, @NonNull ProjectMetricRequest request);
Expand All @@ -50,13 +56,33 @@ record Entry(String name, Instant time, Number value) {
@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
class ProjectMetricsDAOImpl implements ProjectMetricsDAO {

private final @NonNull TransactionTemplateAsync template;

private static final Map<TimeInterval, String> INTERVAL_TO_SQL = Map.of(
TimeInterval.WEEKLY, "toIntervalWeek(1)",
TimeInterval.DAILY, "toIntervalDay(1)",
TimeInterval.HOURLY, "toIntervalHour(1)");

private static final String GET_TRACE_DURATION = """
SELECT <bucket> AS bucket,
arrayMap(v ->
toDecimal64(if(isNaN(v), 0, v), 9),
quantiles(0.5, 0.9, 0.99)(duration)
) AS duration
FROM traces
WHERE project_id = :project_id
AND workspace_id = :workspace_id
AND start_time >= parseDateTime64BestEffort(:start_time, 9)
AND start_time \\<= parseDateTime64BestEffort(:end_time, 9)
GROUP BY bucket
ORDER BY bucket
WITH FILL
FROM <fill_from>
TO parseDateTimeBestEffort(:end_time)
STEP <step>;
""";

private static final String GET_TRACE_COUNT = """
SELECT <bucket> AS bucket,
nullIf(count(DISTINCT id), 0) as count
Expand Down Expand Up @@ -151,6 +177,41 @@ TO parseDateTimeBestEffort(:end_time)
STEP <step>;
""";

@Override
public Mono<List<Entry>> getDuration(@NonNull UUID projectId, @NonNull ProjectMetricRequest request) {
return template.nonTransaction(connection -> getMetric(projectId, request, connection,
GET_TRACE_DURATION, "traceDuration")
.flatMapMany(result -> result.map((row, metadata) -> mapDuration(row)))
.reduce(Stream::concat)
.map(Stream::toList));
}

private Stream<Entry> mapDuration(Row row) {
return Optional.ofNullable(row.get("duration", List.class))
.map(durations -> Stream.of(
Entry.builder().name(NAME_DURATION_P50)
.time(row.get("bucket", Instant.class))
.value(getP(durations, 0))
.build(),
Entry.builder().name(NAME_DURATION_P90)
.time(row.get("bucket", Instant.class))
.value(getP(durations, 1))
.build(),
Entry.builder().name(NAME_DURATION_P99)
.time(row.get("bucket", Instant.class))
.value(getP(durations, 2))
.build()))
.orElse(Stream.empty());
}

private static BigDecimal getP(List durations, int index) {
if (durations.size() <= index) {
return null;
}

return (BigDecimal) durations.get(index);
}

@Override
public Mono<List<Entry>> getTraceCount(@NonNull UUID projectId, @NonNull ProjectMetricRequest request) {
return template.nonTransaction(connection -> getMetric(projectId, request, connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import jakarta.inject.Singleton;
import jakarta.ws.rs.BadRequestException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

Expand All @@ -30,15 +29,23 @@ public interface ProjectMetricsService {

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
class ProjectMetricsServiceImpl implements ProjectMetricsService {
private final @NonNull ProjectMetricsDAO projectMetricsDAO;

private final @NonNull Map<MetricType, BiFunction<UUID, ProjectMetricRequest, Mono<List<ProjectMetricsDAO.Entry>>>> metricHandler;

@Inject
public ProjectMetricsServiceImpl(@NonNull ProjectMetricsDAO projectMetricsDAO) {
metricHandler = Map.of(
MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount,
MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores,
MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage,
MetricType.COST, projectMetricsDAO::getCost,
MetricType.DURATION, projectMetricsDAO::getDuration);
}

@Override
public Mono<ProjectMetricResponse<Number>> getProjectMetrics(UUID projectId, ProjectMetricRequest request) {
return getMetricHandler(request.metricType())
.orElseThrow(
() -> new BadRequestException(ERR_PROJECT_METRIC_NOT_SUPPORTED.formatted(request.metricType())))
.apply(projectId, request)
.map(dataPoints -> ProjectMetricResponse.builder()
.projectId(projectId)
Expand Down Expand Up @@ -71,15 +78,8 @@ private List<ProjectMetricResponse.Results<Number>> entriesToResults(List<Projec
.toList();
}

private Optional<BiFunction<UUID, ProjectMetricRequest, Mono<List<ProjectMetricsDAO.Entry>>>> getMetricHandler(
MetricType metricType) {
Map<MetricType, BiFunction<UUID, ProjectMetricRequest, Mono<List<ProjectMetricsDAO.Entry>>>> HANDLER_BY_TYPE = Map
.of(
MetricType.TRACE_COUNT, projectMetricsDAO::getTraceCount,
MetricType.FEEDBACK_SCORES, projectMetricsDAO::getFeedbackScores,
MetricType.TOKEN_USAGE, projectMetricsDAO::getTokenUsage,
MetricType.COST, projectMetricsDAO::getCost);

return Optional.ofNullable(HANDLER_BY_TYPE.get(metricType));
private BiFunction<UUID, ProjectMetricRequest, Mono<List<ProjectMetricsDAO.Entry>>> getMetricHandler(MetricType metricType) {
return Optional.ofNullable(metricHandler.get(metricType))
.orElseThrow(() -> new BadRequestException(ERR_PROJECT_METRIC_NOT_SUPPORTED.formatted(metricType)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ AND id in (
SELECT
project_id as project_id,
count(DISTINCT span_id) as span_count,
arrayMap(v -> if(isNaN(v), 0, toDecimal64(v / 1000.0, 9)), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
arrayMap(v -> toDecimal64(if(isNaN(v), 0, v), 9), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
sum(input_count) as input,
sum(output_count) as output,
sum(metadata_count) as metadata,
Expand All @@ -616,7 +616,7 @@ AND id in (
workspace_id,
project_id,
id,
if(end_time IS NOT NULL, date_diff('microsecond', start_time, end_time), null) as duration,
duration,
if(length(input) > 0, 1, 0) as input_count,
if(length(output) > 0, 1, 0) as output_count,
if(length(metadata) > 0, 1, 0) as metadata_count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ LEFT JOIN (
SELECT
project_id as project_id,
count(DISTINCT trace_id) as trace_count,
arrayMap(v -> if(isNaN(v), 0, toDecimal64(v / 1000.0, 9)), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
arrayMap(v -> toDecimal64(if(isNaN(v), 0, v), 9), quantiles(0.5, 0.9, 0.99)(duration)) AS duration,
sum(input_count) as input,
sum(output_count) as output,
sum(metadata_count) as metadata,
Expand All @@ -597,7 +597,7 @@ LEFT JOIN (
workspace_id,
project_id,
id,
if(end_time IS NOT NULL, date_diff('microsecond', start_time, end_time), null) as duration,
duration,
if(length(input) > 0, 1, 0) as input_count,
if(length(output) > 0, 1, 0) as output_count,
if(length(metadata) > 0, 1, 0) as metadata_count,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--liquibase formatted sql
--changeset thiagohora:add_duration_columns

ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.spans
ADD COLUMN IF NOT EXISTS duration Nullable(Float64) MATERIALIZED if(end_time IS NOT NULL, (dateDiff('microsecond', start_time, end_time) / 1000.0), NULL);

ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.traces
ADD COLUMN IF NOT EXISTS duration Nullable(Float64) MATERIALIZED if(end_time IS NOT NULL, (dateDiff('microsecond', start_time, end_time) / 1000.0), NULL);

--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.spans DROP COLUMN IF EXISTS duration;
--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.traces DROP COLUMN IF EXISTS duration;
Loading

0 comments on commit 19bdb8b

Please sign in to comment.