From 4553b5c3d7fd475dfd40e9c1aac073ee18a385df Mon Sep 17 00:00:00 2001 From: BorisTkachenko <35521895+BorisTkachenko@users.noreply.github.com> Date: Tue, 17 Sep 2024 17:13:39 +0200 Subject: [PATCH] CM-11314 Endpoint for traces count usage (#223) --- .../main/java/com/comet/opik/api/Project.java | 5 +- .../comet/opik/api/TraceCountResponse.java | 26 +++ .../resources/v1/internal/UsageResource.java | 40 ++++ .../java/com/comet/opik/domain/TraceDAO.java | 23 +++ .../com/comet/opik/domain/TraceService.java | 14 ++ .../v1/internal/UsageResourceTest.java | 193 ++++++++++++++++++ .../resources/v1/priv/TracesResourceTest.java | 33 +-- 7 files changed, 318 insertions(+), 16 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/TraceCountResponse.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/internal/UsageResource.java create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/internal/UsageResourceTest.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java b/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java index 60e62580be..a0479bcad6 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/Project.java @@ -39,8 +39,9 @@ public static class Public { } } - public record ProjectPage(@JsonView( { - Project.View.Public.class}) int page, + public record ProjectPage( + @JsonView( { + Project.View.Public.class}) int page, @JsonView({Project.View.Public.class}) int size, @JsonView({Project.View.Public.class}) long total, @JsonView({Project.View.Public.class}) List content) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/TraceCountResponse.java b/apps/opik-backend/src/main/java/com/comet/opik/api/TraceCountResponse.java new file mode 100644 index 0000000000..e3e97aea69 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/TraceCountResponse.java @@ -0,0 +1,26 @@ +package com.comet.opik.api; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.Builder; + +import java.util.List; + +@Builder(toBuilder = true) +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public record TraceCountResponse( + List workspacesTracesCount) { + public static TraceCountResponse empty() { + return new TraceCountResponse(List.of()); + } + + @Builder(toBuilder = true) + @JsonIgnoreProperties(ignoreUnknown = true) + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record WorkspaceTraceCount( + String workspace, + int traceCount) { + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/internal/UsageResource.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/internal/UsageResource.java new file mode 100644 index 0000000000..c2fbbb59bf --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/internal/UsageResource.java @@ -0,0 +1,40 @@ +package com.comet.opik.api.resources.v1.internal; + +import com.codahale.metrics.annotation.Timed; +import com.comet.opik.api.TraceCountResponse; +import com.comet.opik.domain.TraceService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Path("/v1/internal/usage") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Timed +@Slf4j +@RequiredArgsConstructor(onConstructor_ = @jakarta.inject.Inject) +@Tag(name = "System usage", description = "System usage related resource") +public class UsageResource { + private final @NonNull TraceService traceService; + + @GET + @Path("/workspace-trace-counts") + @Operation(operationId = "getTracesCountForWorkspaces", summary = "Get traces count on previous day for all available workspaces", description = "Get traces count on previous day for all available workspaces", responses = { + @ApiResponse(responseCode = "200", description = "TraceCountResponse resource", content = @Content(schema = @Schema(implementation = TraceCountResponse.class)))}) + public Response getTracesCountForWorkspaces() { + return traceService.countTracesPerWorkspace() + .map(tracesCountResponse -> Response.ok(tracesCountResponse).build()) + .block(); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index e31135a3c7..bf967efb43 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -1,6 +1,7 @@ package com.comet.opik.domain; import com.comet.opik.api.Trace; +import com.comet.opik.api.TraceCountResponse; import com.comet.opik.api.TraceSearchCriteria; import com.comet.opik.api.TraceUpdate; import com.comet.opik.domain.filter.FilterQueryBuilder; @@ -64,6 +65,8 @@ interface TraceDAO { Mono> getTraceWorkspace(Set traceIds, Connection connection); Mono batchInsert(List traces, Connection connection); + + Flux countTracesPerWorkspace(Connection connection); } @Slf4j @@ -274,6 +277,16 @@ AND id in ( ; """; + private static final String TRACE_COUNT_BY_WORKSPACE_ID = """ + SELECT + workspace_id, + COUNT(DISTINCT id) as trace_count + FROM traces + WHERE created_at BETWEEN toStartOfDay(yesterday()) AND toStartOfDay(today()) + GROUP BY workspace_id + ; + """; + private static final String COUNT_BY_PROJECT_ID = """ SELECT count(id) as count @@ -797,4 +810,14 @@ private String getOrDefault(JsonNode value) { return value != null ? value.toString() : ""; } + @com.newrelic.api.agent.Trace(dispatcher = true) + public Flux countTracesPerWorkspace(Connection connection) { + + var statement = connection.createStatement(TRACE_COUNT_BY_WORKSPACE_ID); + + return Mono.from(statement.execute()) + .flatMapMany(result -> result.map((row, rowMetadata) -> TraceCountResponse.WorkspaceTraceCount.builder() + .workspace(row.get("workspace_id", String.class)) + .traceCount(row.get("trace_count", Integer.class)).build())); + } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index 956b01eb36..b1be4a9710 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -4,6 +4,7 @@ import com.comet.opik.api.Project; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; +import com.comet.opik.api.TraceCountResponse; import com.comet.opik.api.TraceSearchCriteria; import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.error.EntityAlreadyExistsException; @@ -57,6 +58,8 @@ public interface TraceService { Mono validateTraceWorkspace(String workspaceId, Set traceIds); + Mono countTracesPerWorkspace(); + } @Slf4j @@ -323,4 +326,15 @@ public Mono validateTraceWorkspace(@NonNull String workspaceId, @NonNul .allMatch(trace -> workspaceId.equals(trace.workspaceId())))); } + @Override + public Mono countTracesPerWorkspace() { + return template.stream(dao::countTracesPerWorkspace) + .collectList() + .flatMap(items -> Mono.just( + TraceCountResponse.builder() + .workspacesTracesCount(items) + .build())) + .switchIfEmpty(Mono.just(TraceCountResponse.empty())); + } + } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/internal/UsageResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/internal/UsageResourceTest.java new file mode 100644 index 0000000000..78e22c95ec --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/internal/UsageResourceTest.java @@ -0,0 +1,193 @@ +package com.comet.opik.api.resources.v1.internal; + +import com.comet.opik.api.Trace; +import com.comet.opik.api.TraceCountResponse; +import com.comet.opik.api.resources.utils.AuthTestUtils; +import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; +import com.comet.opik.api.resources.utils.ClientSupportUtils; +import com.comet.opik.api.resources.utils.MigrationUtils; +import com.comet.opik.api.resources.utils.MySQLContainerUtils; +import com.comet.opik.api.resources.utils.RedisContainerUtils; +import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils; +import com.comet.opik.api.resources.utils.TestUtils; +import com.comet.opik.api.resources.utils.WireMockUtils; +import com.comet.opik.infrastructure.db.TransactionTemplate; +import com.comet.opik.podam.PodamFactoryUtils; +import com.redis.testcontainers.RedisContainer; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Mono; +import ru.vyarus.dropwizard.guice.test.ClientSupport; +import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension; +import uk.co.jemos.podam.api.PodamFactory; + +import java.sql.SQLException; +import java.util.UUID; + +import static com.comet.opik.api.resources.utils.ClickHouseContainerUtils.DATABASE_NAME; +import static com.comet.opik.api.resources.utils.MigrationUtils.CLICKHOUSE_CHANGELOG_FILE; +import static com.comet.opik.domain.ProjectService.DEFAULT_PROJECT; +import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_HEADER; +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers(parallel = true) +@DisplayName("Usage Resource Test") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class UsageResourceTest { + public static final String USAGE_RESOURCE_URL_TEMPLATE = "%s/v1/internal/usage"; + public static final String TRACE_RESOURCE_URL_TEMPLATE = "%s/v1/private/traces"; + + private static final String USER = UUID.randomUUID().toString(); + + private static final RedisContainer REDIS = RedisContainerUtils.newRedisContainer(); + + private static final MySQLContainer MYSQL_CONTAINER = MySQLContainerUtils.newMySQLContainer(); + + private static final ClickHouseContainer CLICK_HOUSE_CONTAINER = ClickHouseContainerUtils.newClickHouseContainer(); + + @RegisterExtension + private static final TestDropwizardAppExtension app; + + private static final WireMockUtils.WireMockRuntime wireMock; + + static { + MYSQL_CONTAINER.start(); + CLICK_HOUSE_CONTAINER.start(); + REDIS.start(); + + wireMock = WireMockUtils.startWireMock(); + + var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory( + CLICK_HOUSE_CONTAINER, DATABASE_NAME); + + app = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension( + MYSQL_CONTAINER.getJdbcUrl(), databaseAnalyticsFactory, wireMock.runtimeInfo(), REDIS.getRedisURI()); + } + + private final PodamFactory factory = PodamFactoryUtils.newPodamFactory(); + + private String baseURI; + private ClientSupport client; + private TransactionTemplate template; + + @BeforeAll + void setUpAll(ClientSupport client, Jdbi jdbi, TransactionTemplate template) throws SQLException { + + MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); + + try (var connection = CLICK_HOUSE_CONTAINER.createConnection("")) { + MigrationUtils.runDbMigration(connection, CLICKHOUSE_CHANGELOG_FILE, + ClickHouseContainerUtils.migrationParameters()); + } + + this.baseURI = "http://localhost:%d".formatted(client.getPort()); + this.client = client; + this.template = template; + + ClientSupportUtils.config(client); + } + + @AfterAll + void tearDownAll() { + wireMock.server().stop(); + } + + private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) { + AuthTestUtils.mockTargetWorkspace(wireMock.server(), apiKey, workspaceName, workspaceId, USER); + } + + @Nested + @DisplayName("Opik usage:") + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class Usage { + + private final String okApikey = UUID.randomUUID().toString(); + + @Test + @DisplayName("Get traces count on previous day for all workspaces, no Auth") + void tracesCountForWorkspace() { + // Setup mock workspace with traces + var workspaceName = UUID.randomUUID().toString(); + var workspaceId = UUID.randomUUID().toString(); + int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey); + + // Change created_at to the previous day in order to capture those traces in count query, since for Stripe we need to count it daily for yesterday + String updateCreatedAt = "ALTER TABLE traces UPDATE created_at = subtractDays(created_at, 1) WHERE workspace_id=:workspace_id;"; + template.nonTransaction(connection -> { + var statement = connection.createStatement(updateCreatedAt) + .bind("workspace_id", workspaceId); + return Mono.from(statement.execute()); + }).block(); + + // Setup second workspace with traces, but leave created_at date set to today, so traces do not end up in the pool + var workspaceNameForToday = UUID.randomUUID().toString(); + var workspaceIdForToday = UUID.randomUUID().toString(); + setupTracesForWorkspace(workspaceNameForToday, workspaceIdForToday, okApikey); + + try (var actualResponse = client.target(USAGE_RESOURCE_URL_TEMPLATE.formatted(baseURI)) + .path("/workspace-trace-counts") + .request() + .header(HttpHeaders.AUTHORIZATION, okApikey) + .header(WORKSPACE_HEADER, workspaceName) + .get()) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200); + assertThat(actualResponse.hasEntity()).isTrue(); + + var response = actualResponse.readEntity(TraceCountResponse.class); + assertThat(response.workspacesTracesCount().size()).isEqualTo(1); + assertThat(response.workspacesTracesCount().get(0)) + .isEqualTo(new TraceCountResponse.WorkspaceTraceCount(workspaceId, tracesCount)); + } + } + } + + private int setupTracesForWorkspace(String workspaceName, String workspaceId, String okApikey) { + mockTargetWorkspace(okApikey, workspaceName, workspaceId); + + var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class) + .stream() + .map(t -> t.toBuilder() + .projectId(null) + .projectName(DEFAULT_PROJECT) + .feedbackScores(null) + .build()) + .toList(); + + traces.forEach(trace -> createTrace(trace, okApikey, workspaceName)); + + return traces.size(); + } + + private UUID createTrace(Trace trace, String apiKey, String workspaceName) { + try (var actualResponse = client.target(TRACE_RESOURCE_URL_TEMPLATE.formatted(baseURI)) + .request() + .accept(MediaType.APPLICATION_JSON_TYPE) + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .post(Entity.json(trace))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(201); + assertThat(actualResponse.hasEntity()).isFalse(); + + var actualId = TestUtils.getIdFromLocation(actualResponse.getLocation()); + + if (trace.id() != null) { + assertThat(actualId).isEqualTo(trace.id()); + } + return actualId; + } + } +} diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java index 8bdf9fb926..4f822c9c9a 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/TracesResourceTest.java @@ -338,18 +338,7 @@ void get__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey, boolean e var workspaceName = UUID.randomUUID().toString(); var workspaceId = UUID.randomUUID().toString(); - mockTargetWorkspace(okApikey, workspaceName, workspaceId); - - var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class) - .stream() - .map(t -> t.toBuilder() - .projectId(null) - .projectName(DEFAULT_PROJECT) - .feedbackScores(null) - .build()) - .toList(); - - traces.forEach(trace -> create(trace, okApikey, workspaceName)); + int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey); try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) .queryParam("project_name", DEFAULT_PROJECT) @@ -363,7 +352,7 @@ void get__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey, boolean e assertThat(actualResponse.hasEntity()).isTrue(); var response = actualResponse.readEntity(Trace.TracePage.class); - assertThat(response.content()).hasSize(traces.size()); + assertThat(response.content()).hasSize(tracesCount); } else { assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(401); assertThat(actualResponse.readEntity(io.dropwizard.jersey.errors.ErrorMessage.class)) @@ -482,7 +471,6 @@ void feedbackBatch__whenApiKeyIsPresent__thenReturnProperResponse(String apiKey, } } - } @Nested @@ -4695,4 +4683,21 @@ private void assertEqualsForScores(List expected, List actual) { .ignoringCollectionOrder() .isEqualTo(expected); } + + private int setupTracesForWorkspace(String workspaceName, String workspaceId, String okApikey) { + mockTargetWorkspace(okApikey, workspaceName, workspaceId); + + var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class) + .stream() + .map(t -> t.toBuilder() + .projectId(null) + .projectName(DEFAULT_PROJECT) + .feedbackScores(null) + .build()) + .toList(); + + traces.forEach(trace -> TracesResourceTest.this.create(trace, okApikey, workspaceName)); + + return traces.size(); + } }