Skip to content

Commit

Permalink
[OPIK-432] Project Metrics MVP (#678)
Browse files Browse the repository at this point in the history
* OPIK-432 project metrics traces count failing test [WIP]

* OPIK-432 project metrics traces count failing test

* OPIK-432 project metrics traces count failing test

* OPIK-432 project metrics traces count failing test

* OPIK-432 project metrics traces count implementation [WIP]

* OPIK-432 project metrics traces count implementation [WIP]

* OPIK-432 project metrics traces count implementation [WIP]

* OPIK-432 project metrics traces count implementation [WIP]

* OPIK-432 project metrics traces count implementation [WIP]

* OPIK-432 project metrics traces count failing test green

* OPIK-432 validations failing test

* OPIK-432 validations failing test green

* OPIK-432 refactor

* OPIK-432 cover auth

* OPIK-432 code style

* OPIK-432 code simplify

* OPIK-432 use number

* OPIK-432 fix comparison

* OPIK-432 pr comments

* OPIK-432 pr comments

* OPIK-432 pr comments

* OPIK-432 pr comments
  • Loading branch information
idoberko2 authored Nov 21, 2024
1 parent ec6e857 commit ce8690a
Show file tree
Hide file tree
Showing 10 changed files with 621 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.comet.opik.api;

import lombok.Builder;

import java.time.Instant;

@Builder(toBuilder = true)
public record DataPoint(Instant time, Number value) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.comet.opik.api;

public enum TimeInterval {
HOURLY,
DAILY,
WEEKLY,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.comet.opik.api.metrics;

public enum MetricType {
FEEDBACK_SCORES,
TRACE_COUNT,
TOKEN_USAGE,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.comet.opik.api.metrics;

import com.comet.opik.api.TimeInterval;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;
import lombok.NonNull;

import java.time.Instant;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ProjectMetricRequest(
@NonNull MetricType metricType,
@NonNull TimeInterval interval,
Instant intervalStart,
Instant intervalEnd) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.comet.opik.api.metrics;

import com.comet.opik.api.DataPoint;
import com.comet.opik.api.TimeInterval;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ProjectMetricResponse(
UUID projectId,
MetricType metricType,
TimeInterval interval,
List<Results> results) {

public static final ProjectMetricResponse EMPTY = ProjectMetricResponse.builder()
.results(List.of())
.build();

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record Results(String name, List<DataPoint> data) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import com.comet.opik.api.ProjectRetrieve;
import com.comet.opik.api.ProjectUpdate;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.metrics.ProjectMetricRequest;
import com.comet.opik.api.metrics.ProjectMetricResponse;
import com.comet.opik.api.sorting.SortingFactoryProjects;
import com.comet.opik.api.sorting.SortingField;
import com.comet.opik.domain.ProjectMetricsService;
import com.comet.opik.domain.ProjectService;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.ratelimit.RateLimited;
Expand Down Expand Up @@ -45,6 +48,8 @@
import java.util.List;
import java.util.UUID;

import static com.comet.opik.utils.AsyncUtils.setRequestContext;

@Path("/v1/private/projects")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
Expand All @@ -57,6 +62,7 @@ public class ProjectsResource {
private final @NonNull ProjectService projectService;
private final @NonNull Provider<RequestContext> requestContext;
private final @NonNull SortingFactoryProjects sortingFactory;
private final @NonNull ProjectMetricsService metricsService;

@GET
@Operation(operationId = "findProjects", summary = "Find projects", description = "Find projects", responses = {
Expand Down Expand Up @@ -183,4 +189,27 @@ public Response retrieveProject(
return Response.ok().entity(project).build();
}

@POST
@Path("/{id}/metrics")
@Operation(operationId = "getProjectMetrics", summary = "Get Project Metrics", description = "Gets specified metrics for a project", responses = {
@ApiResponse(responseCode = "200", description = "Project Metrics", content = @Content(schema = @Schema(implementation = ProjectMetricResponse.class))),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "404", description = "Not Found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))
})
@JsonView({Project.View.Public.class})
public Response getProjectMetrics(
@PathParam("id") UUID projectId,
@RequestBody(content = @Content(schema = @Schema(implementation = ProjectMetricRequest.class))) @Valid ProjectMetricRequest request) {
String workspaceId = requestContext.get().getWorkspaceId();

log.info("Retrieve project metrics for projectId '{}', on workspace_id '{}', metric '{}'", projectId,
workspaceId, request.metricType());
ProjectMetricResponse response = metricsService.getProjectMetrics(projectId, request)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();
log.info("Retrieved project id metrics for projectId '{}', on workspace_id '{}', metric '{}'", projectId,
workspaceId, request.metricType());

return Response.ok().entity(response).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.comet.opik.domain;

import com.comet.opik.api.DataPoint;
import com.comet.opik.api.metrics.ProjectMetricRequest;
import com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono;
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment;
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment;
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;

@ImplementedBy(ProjectMetricsDAOImpl.class)
public interface ProjectMetricsDAO {
Mono<List<DataPoint>> getTraceCount(UUID projectId, ProjectMetricRequest request, Connection connection);
}

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
class ProjectMetricsDAOImpl implements ProjectMetricsDAO {
private static final String GET_TRACE_COUNT = """
SELECT toStartOfInterval(start_time, toIntervalHour(1)) AS bucket,
count() as count
FROM traces
WHERE project_id = :project_id
AND workspace_id = :workspace_id
AND start_time >= parseDateTime64BestEffort(:start_time, 9)
AND end_time \\<= parseDateTime64BestEffort(:end_time, 9)
GROUP BY bucket
ORDER BY bucket
WITH FILL
FROM parseDateTimeBestEffort(:start_time)
TO parseDateTimeBestEffort(:end_time)
STEP toIntervalHour(1);
""";

@Override
public Mono<List<DataPoint>> getTraceCount(
UUID projectId, ProjectMetricRequest request, Connection connection) {
return getTracesCountForProject(projectId, request, connection)
.flatMapMany(this::mapToIntDataPoint)
.collectList();
}

private Mono<? extends Result> getTracesCountForProject(
UUID projectId, ProjectMetricRequest request, Connection connection) {
var template = new ST(GET_TRACE_COUNT);
var statement = connection.createStatement(template.render())
.bind("project_id", projectId)
.bind("start_time", request.intervalStart().toString())
.bind("end_time", request.intervalEnd().toString());

InstrumentAsyncUtils.Segment segment = startSegment("traceCount", "Clickhouse", "get");

return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.doFinally(signalType -> endSegment(segment));
}

private Publisher<DataPoint> mapToIntDataPoint(Result result) {
return result.map(((row, rowMetadata) -> DataPoint.builder()
.time(row.get("bucket", Instant.class))
.value(row.get("count", Integer.class))
.build()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.comet.opik.domain;

import com.comet.opik.api.DataPoint;
import com.comet.opik.api.metrics.ProjectMetricRequest;
import com.comet.opik.api.metrics.ProjectMetricResponse;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.google.inject.ImplementedBy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.ws.rs.BadRequestException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.UUID;

@ImplementedBy(ProjectMetricsServiceImpl.class)
public interface ProjectMetricsService {
String ERR_START_BEFORE_END = "'start_time' must be before 'end_time'";

Mono<ProjectMetricResponse> getProjectMetrics(UUID projectId, ProjectMetricRequest request);
}

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
class ProjectMetricsServiceImpl implements ProjectMetricsService {
private final @NonNull TransactionTemplateAsync template;
private final @NonNull ProjectMetricsDAO projectMetricsDAO;

public static final String NAME_TRACES = "traces";

@Override
public Mono<ProjectMetricResponse> getProjectMetrics(UUID projectId, ProjectMetricRequest request) {
validate(request);

return template.nonTransaction(connection -> projectMetricsDAO.getTraceCount(projectId, request,
connection)
.map(dataPoints -> ProjectMetricResponse.builder()
.projectId(projectId)
.metricType(request.metricType())
.interval(request.interval())
.results(List.of(ProjectMetricResponse.Results.builder()
.name(NAME_TRACES)
.data(dataPoints)
.build()))
.build()));
}

private void validate(ProjectMetricRequest request) {
if (!request.intervalStart().isBefore(request.intervalEnd())) {
throw new BadRequestException(ERR_START_BEFORE_END);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.comet.opik.api.FeedbackScoreBatch;
import com.comet.opik.api.FeedbackScoreBatchItem;
import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceBatch;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.HttpHeaders;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -46,4 +47,16 @@ public void feedbackScore(List<FeedbackScoreBatchItem> score, String apiKey, Str
}
}

public void batchCreateTraces(List<Trace> traces, String apiKey, String workspaceName) {
try (var actualResponse = client.target(RESOURCE_PATH.formatted(baseURI))
.path("batch")
.request()
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(TraceBatch.builder().traces(traces).build()))) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204);
assertThat(actualResponse.hasEntity()).isFalse();
}
}
}
Loading

0 comments on commit ce8690a

Please sign in to comment.