Skip to content

Commit

Permalink
CM-11314 Endpoint for traces count usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Borys Tkachenko committed Sep 17, 2024
1 parent 6abef69 commit 866678a
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public static class Public {
}
}

public record ProjectPage(@JsonView( {
Project.View.Public.class}) int page,
public record ProjectPage(
@JsonView( {
Project.View.Public.class}) int page,
@JsonView({Project.View.Public.class}) int size,
@JsonView({Project.View.Public.class}) long total,
@JsonView({Project.View.Public.class}) List<Project> content)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record TraceCountResponse(
List<WorkspaceTraceCount> workspacesTracesCount) {
public static TraceCountResponse empty() {
return new TraceCountResponse(List.of());
}

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record WorkspaceTraceCount(
String workspace,
int traceCount) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.comet.opik.api.resources.v1.internal;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.domain.TraceService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Path("/v1/internal/usage")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Timed
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @jakarta.inject.Inject)
@Tag(name = "System usage", description = "System usage related resource")
public class UsageResource {
private final @NonNull TraceService traceService;

@GET
@Path("/workspace-trace-counts")
@Operation(operationId = "getTracesCountForWorkspaces", summary = "Get traces count on previous day for all available workspaces", description = "Get traces count on previous day for all available workspaces", responses = {
@ApiResponse(responseCode = "200", description = "TraceCountResponse resource", content = @Content(schema = @Schema(implementation = TraceCountResponse.class)))})
public Response getTracesCountForWorkspaces() {
return traceService.countTracesPerWorkspace()
.map(tracesCountResponse -> Response.ok(tracesCountResponse).build())
.block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.comet.opik.api.Trace;
import com.comet.opik.api.Trace.TracePage;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.TracesDelete;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.domain;

import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.domain.filter.FilterQueryBuilder;
Expand Down Expand Up @@ -64,6 +65,8 @@ interface TraceDAO {
Mono<List<WorkspaceAndResourceId>> getTraceWorkspace(Set<UUID> traceIds, Connection connection);

Mono<Long> batchInsert(List<Trace> traces, Connection connection);

Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection);
}

@Slf4j
Expand Down Expand Up @@ -274,6 +277,16 @@ AND id in (
;
""";

private static final String TRACE_COUNT_BY_WORKSPACE_ID = """
SELECT
workspace_id,
COUNT(DISTINCT id) as trace_count
FROM traces
WHERE created_at BETWEEN toStartOfDay(yesterday()) AND toStartOfDay(today())
GROUP BY workspace_id
;
""";

private static final String COUNT_BY_PROJECT_ID = """
SELECT
count(id) as count
Expand Down Expand Up @@ -797,4 +810,14 @@ private String getOrDefault(JsonNode value) {
return value != null ? value.toString() : "";
}

@com.newrelic.api.agent.Trace(dispatcher = true)
public Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection) {

var statement = connection.createStatement(TRACE_COUNT_BY_WORKSPACE_ID);

return Mono.from(statement.execute())
.flatMapMany(result -> result.map((row, rowMetadata) -> TraceCountResponse.WorkspaceTraceCount.builder()
.workspace(row.get("workspace_id", String.class))
.traceCount(row.get("trace_count", Integer.class)).build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.comet.opik.api.Project;
import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.error.EntityAlreadyExistsException;
Expand Down Expand Up @@ -57,6 +58,8 @@ public interface TraceService {

Mono<Boolean> validateTraceWorkspace(String workspaceId, Set<UUID> traceIds);

Mono<TraceCountResponse> countTracesPerWorkspace();

}

@Slf4j
Expand Down Expand Up @@ -323,4 +326,15 @@ public Mono<Boolean> validateTraceWorkspace(@NonNull String workspaceId, @NonNul
.allMatch(trace -> workspaceId.equals(trace.workspaceId()))));
}

@Override
public Mono<TraceCountResponse> countTracesPerWorkspace() {
return template.stream(dao::countTracesPerWorkspace)
.collectList()
.flatMap(items -> Mono.just(
TraceCountResponse.builder()
.workspacesTracesCount(items)
.build()))
.switchIfEmpty(Mono.just(TraceCountResponse.empty()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.comet.opik.api.resources.v1.internal;

import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.resources.utils.AuthTestUtils;
import com.comet.opik.api.resources.utils.ClickHouseContainerUtils;
import com.comet.opik.api.resources.utils.ClientSupportUtils;
import com.comet.opik.api.resources.utils.MigrationUtils;
import com.comet.opik.api.resources.utils.MySQLContainerUtils;
import com.comet.opik.api.resources.utils.RedisContainerUtils;
import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils;
import com.comet.opik.api.resources.utils.TestUtils;
import com.comet.opik.api.resources.utils.WireMockUtils;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.podam.PodamFactoryUtils;
import com.redis.testcontainers.RedisContainer;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Mono;
import ru.vyarus.dropwizard.guice.test.ClientSupport;
import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension;
import uk.co.jemos.podam.api.PodamFactory;

import java.sql.SQLException;
import java.util.UUID;

import static com.comet.opik.api.resources.utils.ClickHouseContainerUtils.DATABASE_NAME;
import static com.comet.opik.api.resources.utils.MigrationUtils.CLICKHOUSE_CHANGELOG_FILE;
import static com.comet.opik.domain.ProjectService.DEFAULT_PROJECT;
import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_HEADER;
import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers(parallel = true)
@DisplayName("Usage Resource Test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UsageResourceTest {
public static final String USAGE_RESOURCE_URL_TEMPLATE = "%s/v1/internal/usage";
public static final String TRACE_RESOURCE_URL_TEMPLATE = "%s/v1/private/traces";

private static final String USER = UUID.randomUUID().toString();

private static final RedisContainer REDIS = RedisContainerUtils.newRedisContainer();

private static final MySQLContainer<?> MYSQL_CONTAINER = MySQLContainerUtils.newMySQLContainer();

private static final ClickHouseContainer CLICK_HOUSE_CONTAINER = ClickHouseContainerUtils.newClickHouseContainer();

@RegisterExtension
private static final TestDropwizardAppExtension app;

private static final WireMockUtils.WireMockRuntime wireMock;

static {
MYSQL_CONTAINER.start();
CLICK_HOUSE_CONTAINER.start();
REDIS.start();

wireMock = WireMockUtils.startWireMock();

var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory(
CLICK_HOUSE_CONTAINER, DATABASE_NAME);

app = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension(
MYSQL_CONTAINER.getJdbcUrl(), databaseAnalyticsFactory, wireMock.runtimeInfo(), REDIS.getRedisURI());
}

private final PodamFactory factory = PodamFactoryUtils.newPodamFactory();

private String baseURI;
private ClientSupport client;
private TransactionTemplate template;

@BeforeAll
void setUpAll(ClientSupport client, Jdbi jdbi, TransactionTemplate template) throws SQLException {

MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters());

try (var connection = CLICK_HOUSE_CONTAINER.createConnection("")) {
MigrationUtils.runDbMigration(connection, CLICKHOUSE_CHANGELOG_FILE,
ClickHouseContainerUtils.migrationParameters());
}

this.baseURI = "http://localhost:%d".formatted(client.getPort());
this.client = client;
this.template = template;

ClientSupportUtils.config(client);
}

@AfterAll
void tearDownAll() {
wireMock.server().stop();
}

private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) {
AuthTestUtils.mockTargetWorkspace(wireMock.server(), apiKey, workspaceName, workspaceId, USER);
}

@Nested
@DisplayName("Opik usage:")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Usage {

private final String okApikey = UUID.randomUUID().toString();

@Test
@DisplayName("Get traces count on previous day for all workspaces, no Auth")
void tracesCountForWorkspace() {
// Setup mock workspace with traces
var workspaceName = UUID.randomUUID().toString();
var workspaceId = UUID.randomUUID().toString();
int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey);

// Change created_at to the previous day in order to capture those traces in count query, since for Stripe we need to count it daily for yesterday
String updateCreatedAt = "ALTER TABLE traces UPDATE created_at = subtractDays(created_at, 1) WHERE workspace_id=:workspace_id;";
template.nonTransaction(connection -> {
var statement = connection.createStatement(updateCreatedAt)
.bind("workspace_id", workspaceId);
return Mono.from(statement.execute());
}).block();

// Setup second workspace with traces, but leave created_at date set to today, so traces do not end up in the pool
var workspaceNameForToday = UUID.randomUUID().toString();
var workspaceIdForToday = UUID.randomUUID().toString();
setupTracesForWorkspace(workspaceNameForToday, workspaceIdForToday, okApikey);

try (var actualResponse = client.target(USAGE_RESOURCE_URL_TEMPLATE.formatted(baseURI))
.path("/workspace-trace-counts")
.request()
.header(HttpHeaders.AUTHORIZATION, okApikey)
.header(WORKSPACE_HEADER, workspaceName)
.get()) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200);
assertThat(actualResponse.hasEntity()).isTrue();

var response = actualResponse.readEntity(TraceCountResponse.class);
assertThat(response.workspacesTracesCount().size()).isEqualTo(1);
assertThat(response.workspacesTracesCount().get(0))
.isEqualTo(new TraceCountResponse.WorkspaceTraceCount(workspaceId, tracesCount));
}
}
}

private int setupTracesForWorkspace(String workspaceName, String workspaceId, String okApikey) {
mockTargetWorkspace(okApikey, workspaceName, workspaceId);

var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class)
.stream()
.map(t -> t.toBuilder()
.projectId(null)
.projectName(DEFAULT_PROJECT)
.feedbackScores(null)
.build())
.toList();

traces.forEach(trace -> createTrace(trace, okApikey, workspaceName));

return traces.size();
}

private UUID createTrace(Trace trace, String apiKey, String workspaceName) {
try (var actualResponse = client.target(TRACE_RESOURCE_URL_TEMPLATE.formatted(baseURI))
.request()
.accept(MediaType.APPLICATION_JSON_TYPE)
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(trace))) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(201);
assertThat(actualResponse.hasEntity()).isFalse();

var actualId = TestUtils.getIdFromLocation(actualResponse.getLocation());

if (trace.id() != null) {
assertThat(actualId).isEqualTo(trace.id());
}
return actualId;
}
}
}
Loading

0 comments on commit 866678a

Please sign in to comment.