Skip to content

Commit

Permalink
CM-11314 Endpoint for traces count
Browse files Browse the repository at this point in the history
  • Loading branch information
Borys Tkachenko committed Sep 17, 2024
1 parent 6abef69 commit 003c00c
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public static class Public {
}
}

public record ProjectPage(@JsonView( {
Project.View.Public.class}) int page,
public record ProjectPage(
@JsonView( {
Project.View.Public.class}) int page,
@JsonView({Project.View.Public.class}) int size,
@JsonView({Project.View.Public.class}) long total,
@JsonView({Project.View.Public.class}) List<Project> content)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record TraceCountResponse(
List<WorkspaceTraceCount> workspacesTracesCount) {
public static TraceCountResponse empty() {
return new TraceCountResponse(List.of());
}

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record WorkspaceTraceCount(
String workspace,
int traceCount) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.comet.opik.api.Trace;
import com.comet.opik.api.Trace.TracePage;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.TracesDelete;
Expand Down Expand Up @@ -121,6 +122,16 @@ public Response getById(@PathParam("id") UUID id) {
return Response.ok(trace).build();
}

@GET
@Path("/workspace-trace-counts")
@Operation(operationId = "getTracesCountForWorkspaces", summary = "Get traces count on previous day for all available workspaces", description = "Get traces count on previous day for all available workspaces", responses = {
@ApiResponse(responseCode = "200", description = "TraceCountResponse resource", content = @Content(schema = @Schema(implementation = TraceCountResponse.class)))})
public Response getTracesCountForWorkspaces() {
return service.countTracesPerWorkspace()
.map(tracesCountResponse -> Response.ok(tracesCountResponse).build())
.block();
}

@POST
@Operation(operationId = "createTrace", summary = "Create trace", description = "Get trace", responses = {
@ApiResponse(responseCode = "201", description = "Created", headers = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.domain;

import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.domain.filter.FilterQueryBuilder;
Expand Down Expand Up @@ -64,6 +65,8 @@ interface TraceDAO {
Mono<List<WorkspaceAndResourceId>> getTraceWorkspace(Set<UUID> traceIds, Connection connection);

Mono<Long> batchInsert(List<Trace> traces, Connection connection);

Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection);
}

@Slf4j
Expand Down Expand Up @@ -274,6 +277,16 @@ AND id in (
;
""";

private static final String TRACE_COUNT_BY_WORKSPACE_ID = """
SELECT
workspace_id,
COUNT(DISTINCT id) as trace_count
FROM traces
WHERE created_at BETWEEN toStartOfDay(yesterday()) AND toStartOfDay(today())
GROUP BY workspace_id
;
""";

private static final String COUNT_BY_PROJECT_ID = """
SELECT
count(id) as count
Expand Down Expand Up @@ -797,4 +810,14 @@ private String getOrDefault(JsonNode value) {
return value != null ? value.toString() : "";
}

@com.newrelic.api.agent.Trace(dispatcher = true)
public Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection) {

var statement = connection.createStatement(TRACE_COUNT_BY_WORKSPACE_ID);

return Mono.from(statement.execute())
.flatMapMany(result -> result.map((row, rowMetadata) -> TraceCountResponse.WorkspaceTraceCount.builder()
.workspace(row.get("workspace_id", String.class))
.traceCount(row.get("trace_count", Integer.class)).build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.comet.opik.api.Project;
import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.error.EntityAlreadyExistsException;
Expand Down Expand Up @@ -57,6 +58,8 @@ public interface TraceService {

Mono<Boolean> validateTraceWorkspace(String workspaceId, Set<UUID> traceIds);

Mono<TraceCountResponse> countTracesPerWorkspace();

}

@Slf4j
Expand Down Expand Up @@ -323,4 +326,15 @@ public Mono<Boolean> validateTraceWorkspace(@NonNull String workspaceId, @NonNul
.allMatch(trace -> workspaceId.equals(trace.workspaceId()))));
}

@Override
public Mono<TraceCountResponse> countTracesPerWorkspace() {
return template.stream(dao::countTracesPerWorkspace)
.collectList()
.flatMap(items -> Mono.just(
TraceCountResponse.builder()
.workspacesTracesCount(items)
.build()))
.switchIfEmpty(Mono.just(TraceCountResponse.empty()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.TracesDelete;
import com.comet.opik.api.error.ErrorMessage;
Expand All @@ -30,6 +31,7 @@
import com.comet.opik.domain.FeedbackScoreMapper;
import com.comet.opik.domain.SpanType;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.podam.PodamFactoryUtils;
import com.comet.opik.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -144,9 +146,10 @@ class TracesResourceTest {

private String baseURI;
private ClientSupport client;
private TransactionTemplate template;

@BeforeAll
void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException {
void setUpAll(ClientSupport client, Jdbi jdbi, TransactionTemplate template) throws SQLException {

MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters());

Expand All @@ -157,6 +160,7 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException {

this.baseURI = "http://localhost:%d".formatted(client.getPort());
this.client = client;
this.template = template;

ClientSupportUtils.config(client);

Expand Down Expand Up @@ -338,18 +342,7 @@ void get__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey, boolean e
var workspaceName = UUID.randomUUID().toString();
var workspaceId = UUID.randomUUID().toString();

mockTargetWorkspace(okApikey, workspaceName, workspaceId);

var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class)
.stream()
.map(t -> t.toBuilder()
.projectId(null)
.projectName(DEFAULT_PROJECT)
.feedbackScores(null)
.build())
.toList();

traces.forEach(trace -> create(trace, okApikey, workspaceName));
int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey);

try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI))
.queryParam("project_name", DEFAULT_PROJECT)
Expand All @@ -363,7 +356,7 @@ void get__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey, boolean e
assertThat(actualResponse.hasEntity()).isTrue();

var response = actualResponse.readEntity(Trace.TracePage.class);
assertThat(response.content()).hasSize(traces.size());
assertThat(response.content()).hasSize(tracesCount);
} else {
assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(401);
assertThat(actualResponse.readEntity(io.dropwizard.jersey.errors.ErrorMessage.class))
Expand Down Expand Up @@ -482,7 +475,6 @@ void feedbackBatch__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey,
}

}

}

@Nested
Expand Down Expand Up @@ -4669,6 +4661,61 @@ private void createAndAssert(String path, FeedbackScoreBatch request, String wor
}
}

@Nested
@DisplayName("Opik usage:")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Usage {

private final String okApikey = UUID.randomUUID().toString();

@BeforeEach
void setUp() {
wireMock.server().stubFor(
post(urlPathEqualTo("/opik/auth"))
.withHeader(HttpHeaders.AUTHORIZATION, equalTo(""))
.withRequestBody(matchingJsonPath("$.workspaceName", matching(".+")))
.willReturn(WireMock.unauthorized()));
}

@Test
@DisplayName("Get traces count on previous day for all workspaces, no Auth")
void tracesCountForWorkspace() {
// Setup mock workspace with traces
var workspaceName = UUID.randomUUID().toString();
var workspaceId = UUID.randomUUID().toString();
int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey);

// Change created_at to the previous day in order to capture those traces in count query, since for Stripe we need to count it daily for yesterday
String updateCreatedAt = "ALTER TABLE traces UPDATE created_at = subtractDays(created_at, 1) WHERE workspace_id=:workspace_id;";
template.nonTransaction(connection -> {
var statement = connection.createStatement(updateCreatedAt)
.bind("workspace_id", workspaceId);
return Mono.from(statement.execute());
}).block();

// Setup second workspace with traces, but leave created_at date set to today, so traces do not end up in the pool
var workspaceNameForToday = UUID.randomUUID().toString();
var workspaceIdForToday = UUID.randomUUID().toString();
setupTracesForWorkspace(workspaceNameForToday, workspaceIdForToday, okApikey);

try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI))
.path("/workspace-trace-counts")
.request()
.header(HttpHeaders.AUTHORIZATION, okApikey)
.header(WORKSPACE_HEADER, workspaceName)
.get()) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200);
assertThat(actualResponse.hasEntity()).isTrue();

var response = actualResponse.readEntity(TraceCountResponse.class);
assertThat(response.workspacesTracesCount().size()).isEqualTo(1);
assertThat(response.workspacesTracesCount().get(0))
.isEqualTo(new TraceCountResponse.WorkspaceTraceCount(workspaceId, tracesCount));
}
}
}

private void assertEqualsForScores(FeedbackScore actualScore, FeedbackScore expectedScore) {
assertThat(actualScore)
.usingRecursiveComparison()
Expand All @@ -4695,4 +4742,21 @@ private <T, R> void assertEqualsForScores(List<T> expected, List<R> actual) {
.ignoringCollectionOrder()
.isEqualTo(expected);
}

private int setupTracesForWorkspace(String workspaceName, String workspaceId, String okApikey) {
mockTargetWorkspace(okApikey, workspaceName, workspaceId);

var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class)
.stream()
.map(t -> t.toBuilder()
.projectId(null)
.projectName(DEFAULT_PROJECT)
.feedbackScores(null)
.build())
.toList();

traces.forEach(trace -> TracesResourceTest.this.create(trace, okApikey, workspaceName));

return traces.size();
}
}

0 comments on commit 003c00c

Please sign in to comment.