Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CM-11314] Opik endpoint for traces count #223

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public static class Public {
}
}

public record ProjectPage(@JsonView( {
Project.View.Public.class}) int page,
public record ProjectPage(
@JsonView( {
Project.View.Public.class}) int page,
@JsonView({Project.View.Public.class}) int size,
@JsonView({Project.View.Public.class}) long total,
@JsonView({Project.View.Public.class}) List<Project> content)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.util.List;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record TraceCountResponse(
List<WorkspaceTraceCount> workspacesTracesCount) {
public static TraceCountResponse empty() {
return new TraceCountResponse(List.of());
}

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record WorkspaceTraceCount(
String workspace,
int traceCount) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.comet.opik.api.resources.v1.internal;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.domain.TraceService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Path("/v1/internal/usage")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Timed
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @jakarta.inject.Inject)
@Tag(name = "System usage", description = "System usage related resource")
public class UsageResource {
private final @NonNull TraceService traceService;

@GET
@Path("/workspace-trace-counts")
@Operation(operationId = "getTracesCountForWorkspaces", summary = "Get traces count on previous day for all available workspaces", description = "Get traces count on previous day for all available workspaces", responses = {
@ApiResponse(responseCode = "200", description = "TraceCountResponse resource", content = @Content(schema = @Schema(implementation = TraceCountResponse.class)))})
public Response getTracesCountForWorkspaces() {
return traceService.countTracesPerWorkspace()
.map(tracesCountResponse -> Response.ok(tracesCountResponse).build())
.block();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.domain;

import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.domain.filter.FilterQueryBuilder;
Expand Down Expand Up @@ -64,6 +65,8 @@ interface TraceDAO {
Mono<List<WorkspaceAndResourceId>> getTraceWorkspace(Set<UUID> traceIds, Connection connection);

Mono<Long> batchInsert(List<Trace> traces, Connection connection);

Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection);
}

@Slf4j
Expand Down Expand Up @@ -274,6 +277,16 @@ AND id in (
;
""";

private static final String TRACE_COUNT_BY_WORKSPACE_ID = """
SELECT
workspace_id,
COUNT(DISTINCT id) as trace_count
FROM traces
WHERE created_at BETWEEN toStartOfDay(yesterday()) AND toStartOfDay(today())
GROUP BY workspace_id
Nimrod007 marked this conversation as resolved.
Show resolved Hide resolved
;
""";

private static final String COUNT_BY_PROJECT_ID = """
SELECT
count(id) as count
Expand Down Expand Up @@ -797,4 +810,14 @@ private String getOrDefault(JsonNode value) {
return value != null ? value.toString() : "";
}

@com.newrelic.api.agent.Trace(dispatcher = true)
public Flux<TraceCountResponse.WorkspaceTraceCount> countTracesPerWorkspace(Connection connection) {

var statement = connection.createStatement(TRACE_COUNT_BY_WORKSPACE_ID);

return Mono.from(statement.execute())
.flatMapMany(result -> result.map((row, rowMetadata) -> TraceCountResponse.WorkspaceTraceCount.builder()
.workspace(row.get("workspace_id", String.class))
.traceCount(row.get("trace_count", Integer.class)).build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.comet.opik.api.Project;
import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.TraceSearchCriteria;
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.error.EntityAlreadyExistsException;
Expand Down Expand Up @@ -57,6 +58,8 @@ public interface TraceService {

Mono<Boolean> validateTraceWorkspace(String workspaceId, Set<UUID> traceIds);

Mono<TraceCountResponse> countTracesPerWorkspace();

}

@Slf4j
Expand Down Expand Up @@ -323,4 +326,15 @@ public Mono<Boolean> validateTraceWorkspace(@NonNull String workspaceId, @NonNul
.allMatch(trace -> workspaceId.equals(trace.workspaceId()))));
}

@Override
public Mono<TraceCountResponse> countTracesPerWorkspace() {
return template.stream(dao::countTracesPerWorkspace)
.collectList()
.flatMap(items -> Mono.just(
TraceCountResponse.builder()
.workspacesTracesCount(items)
.build()))
.switchIfEmpty(Mono.just(TraceCountResponse.empty()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.comet.opik.api.resources.v1.internal;

import com.comet.opik.api.Trace;
import com.comet.opik.api.TraceCountResponse;
import com.comet.opik.api.resources.utils.AuthTestUtils;
import com.comet.opik.api.resources.utils.ClickHouseContainerUtils;
import com.comet.opik.api.resources.utils.ClientSupportUtils;
import com.comet.opik.api.resources.utils.MigrationUtils;
import com.comet.opik.api.resources.utils.MySQLContainerUtils;
import com.comet.opik.api.resources.utils.RedisContainerUtils;
import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils;
import com.comet.opik.api.resources.utils.TestUtils;
import com.comet.opik.api.resources.utils.WireMockUtils;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.podam.PodamFactoryUtils;
import com.redis.testcontainers.RedisContainer;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Mono;
import ru.vyarus.dropwizard.guice.test.ClientSupport;
import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension;
import uk.co.jemos.podam.api.PodamFactory;

import java.sql.SQLException;
import java.util.UUID;

import static com.comet.opik.api.resources.utils.ClickHouseContainerUtils.DATABASE_NAME;
import static com.comet.opik.api.resources.utils.MigrationUtils.CLICKHOUSE_CHANGELOG_FILE;
import static com.comet.opik.domain.ProjectService.DEFAULT_PROJECT;
import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_HEADER;
import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers(parallel = true)
@DisplayName("Usage Resource Test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UsageResourceTest {
public static final String USAGE_RESOURCE_URL_TEMPLATE = "%s/v1/internal/usage";
public static final String TRACE_RESOURCE_URL_TEMPLATE = "%s/v1/private/traces";

private static final String USER = UUID.randomUUID().toString();

private static final RedisContainer REDIS = RedisContainerUtils.newRedisContainer();

private static final MySQLContainer<?> MYSQL_CONTAINER = MySQLContainerUtils.newMySQLContainer();

private static final ClickHouseContainer CLICK_HOUSE_CONTAINER = ClickHouseContainerUtils.newClickHouseContainer();

@RegisterExtension
private static final TestDropwizardAppExtension app;

private static final WireMockUtils.WireMockRuntime wireMock;

static {
MYSQL_CONTAINER.start();
CLICK_HOUSE_CONTAINER.start();
REDIS.start();

wireMock = WireMockUtils.startWireMock();

var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory(
CLICK_HOUSE_CONTAINER, DATABASE_NAME);

app = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension(
MYSQL_CONTAINER.getJdbcUrl(), databaseAnalyticsFactory, wireMock.runtimeInfo(), REDIS.getRedisURI());
}

private final PodamFactory factory = PodamFactoryUtils.newPodamFactory();

private String baseURI;
private ClientSupport client;
private TransactionTemplate template;

@BeforeAll
void setUpAll(ClientSupport client, Jdbi jdbi, TransactionTemplate template) throws SQLException {

MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters());

try (var connection = CLICK_HOUSE_CONTAINER.createConnection("")) {
MigrationUtils.runDbMigration(connection, CLICKHOUSE_CHANGELOG_FILE,
ClickHouseContainerUtils.migrationParameters());
}

this.baseURI = "http://localhost:%d".formatted(client.getPort());
this.client = client;
this.template = template;

ClientSupportUtils.config(client);
}

@AfterAll
void tearDownAll() {
wireMock.server().stop();
}

private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) {
AuthTestUtils.mockTargetWorkspace(wireMock.server(), apiKey, workspaceName, workspaceId, USER);
}

@Nested
@DisplayName("Opik usage:")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Usage {

private final String okApikey = UUID.randomUUID().toString();

@Test
@DisplayName("Get traces count on previous day for all workspaces, no Auth")
void tracesCountForWorkspace() {
// Setup mock workspace with traces
var workspaceName = UUID.randomUUID().toString();
var workspaceId = UUID.randomUUID().toString();
int tracesCount = setupTracesForWorkspace(workspaceName, workspaceId, okApikey);

// Change created_at to the previous day in order to capture those traces in count query, since for Stripe we need to count it daily for yesterday
String updateCreatedAt = "ALTER TABLE traces UPDATE created_at = subtractDays(created_at, 1) WHERE workspace_id=:workspace_id;";
template.nonTransaction(connection -> {
var statement = connection.createStatement(updateCreatedAt)
.bind("workspace_id", workspaceId);
return Mono.from(statement.execute());
}).block();

// Setup second workspace with traces, but leave created_at date set to today, so traces do not end up in the pool
var workspaceNameForToday = UUID.randomUUID().toString();
var workspaceIdForToday = UUID.randomUUID().toString();
setupTracesForWorkspace(workspaceNameForToday, workspaceIdForToday, okApikey);

try (var actualResponse = client.target(USAGE_RESOURCE_URL_TEMPLATE.formatted(baseURI))
.path("/workspace-trace-counts")
.request()
.header(HttpHeaders.AUTHORIZATION, okApikey)
.header(WORKSPACE_HEADER, workspaceName)
.get()) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200);
assertThat(actualResponse.hasEntity()).isTrue();

var response = actualResponse.readEntity(TraceCountResponse.class);
assertThat(response.workspacesTracesCount().size()).isEqualTo(1);
assertThat(response.workspacesTracesCount().get(0))
.isEqualTo(new TraceCountResponse.WorkspaceTraceCount(workspaceId, tracesCount));
}
}
}

private int setupTracesForWorkspace(String workspaceName, String workspaceId, String okApikey) {
mockTargetWorkspace(okApikey, workspaceName, workspaceId);

var traces = PodamFactoryUtils.manufacturePojoList(factory, Trace.class)
.stream()
.map(t -> t.toBuilder()
.projectId(null)
.projectName(DEFAULT_PROJECT)
.feedbackScores(null)
.build())
.toList();

traces.forEach(trace -> createTrace(trace, okApikey, workspaceName));

return traces.size();
}

private UUID createTrace(Trace trace, String apiKey, String workspaceName) {
try (var actualResponse = client.target(TRACE_RESOURCE_URL_TEMPLATE.formatted(baseURI))
.request()
.accept(MediaType.APPLICATION_JSON_TYPE)
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(trace))) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(201);
assertThat(actualResponse.hasEntity()).isFalse();

var actualId = TestUtils.getIdFromLocation(actualResponse.getLocation());

if (trace.id() != null) {
assertThat(actualId).isEqualTo(trace.id());
}
return actualId;
}
}
}
Loading