diff --git a/apps/opik-backend/entrypoint.sh b/apps/opik-backend/entrypoint.sh index 140cc93987..3dbb72594b 100644 --- a/apps/opik-backend/entrypoint.sh +++ b/apps/opik-backend/entrypoint.sh @@ -5,12 +5,13 @@ echo $(pwd) jwebserver -d /opt/opik/redoc -b 0.0.0.0 -p 3003 & echo "OPIK_VERSION=$OPIK_VERSION" -echo "NEW_RELIC_ENABLED=$NEW_RELIC_ENABLED" -echo "NEW_RELIC_VERSION=$NEW_RELIC_VERSION" +echo "OPIK_OTEL_SDK_ENABLED=$OPIK_OTEL_SDK_ENABLED" +echo "OTEL_VERSION=$OTEL_VERSION" -if [[ "${NEW_RELIC_ENABLED}" == "true" && "${NEW_RELIC_LICENSE_KEY}" != "" ]];then - curl -o /tmp/newrelic-agent.jar https://download.newrelic.com/newrelic/java-agent/newrelic-agent/${NEW_RELIC_VERSION}/newrelic-agent-${NEW_RELIC_VERSION}.jar - JAVA_OPTS="$JAVA_OPTS -javaagent:/tmp/newrelic-agent.jar" +if [[ "${OPIK_OTEL_SDK_ENABLED}" == "true" && "${OTEL_VERSION}" != "" && "${OTEL_EXPORTER_OTLP_ENDPOINT}" != "" ]];then + OTEL_RESOURCE_ATTRIBUTES="service.name=opik-backend,service.version=${OPIK_VERSION}" + curl -L -o /tmp/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v${OTEL_VERSION}/opentelemetry-javaagent.jar + JAVA_OPTS="$JAVA_OPTS -javaagent:/tmp/opentelemetry-javaagent.jar" fi # Check if ENABLE_VIRTUAL_THREADS is set to true diff --git a/apps/opik-backend/pom.xml b/apps/opik-backend/pom.xml index 4ed764fdff..c79b4ce5d5 100644 --- a/apps/opik-backend/pom.xml +++ b/apps/opik-backend/pom.xml @@ -30,6 +30,7 @@ 5.1.0 3.9.1 3.34.1 + 2.8.0 2.25.70 com.comet.opik.OpikApplication @@ -62,9 +63,14 @@ - com.newrelic.agent.java - newrelic-api - 8.14.0 + io.opentelemetry.instrumentation + opentelemetry-instrumentation-annotations + ${opentelmetry.version} + + + io.opentelemetry.instrumentation + opentelemetry-r2dbc-1.0 + ${opentelmetry.version}-alpha software.amazon.awssdk diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java index 026ee5f77e..20317ad3c2 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java @@ -12,8 +12,7 @@ import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.google.inject.ImplementedBy; -import com.newrelic.api.agent.Segment; -import com.newrelic.api.agent.Trace; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Result; import io.r2dbc.spi.Statement; @@ -40,6 +39,8 @@ import static com.comet.opik.api.DatasetItem.DatasetItemPage; import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux; +import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.Segment; +import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment; import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware; import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware; @@ -408,7 +409,7 @@ LEFT JOIN ( private final @NonNull FilterQueryBuilder filterQueryBuilder; @Override - @Trace(dispatcher = true) + @WithSpan public Mono save(@NonNull UUID datasetId, @NonNull List items) { if (items.isEmpty()) { @@ -455,7 +456,7 @@ private Mono mapAndInsert( return Flux.from(statement.execute()) .flatMap(Result::getRowsUpdated) .reduce(0L, Long::sum) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); }); } @@ -550,7 +551,7 @@ private List getFeedbackScores(Object feedbackScoresRaw) { } @Override - @Trace(dispatcher = true) + @WithSpan public Mono get(@NonNull UUID id) { return asyncTemplate.nonTransaction(connection -> { @@ -560,14 +561,14 @@ public Mono get(@NonNull UUID id) { Segment segment = startSegment("dataset_items", "Clickhouse", "select_dataset_item"); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segment.end()) + .doFinally(signalType -> endSegment(segment)) .flatMap(this::mapItem) .singleOrEmpty(); }); } @Override - @Trace(dispatcher = true) + @WithSpan public Flux getItems(@NonNull UUID datasetId, int limit, UUID lastRetrievedId) { log.info("Getting dataset items by datasetId '{}', limit '{}', lastRetrievedId '{}'", datasetId, limit, lastRetrievedId); @@ -591,13 +592,13 @@ public Flux getItems(@NonNull UUID datasetId, int limit, UUID lastR Segment segment = startSegment("dataset_items", "Clickhouse", "select_dataset_items_stream"); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segment.end()) + .doFinally(signalType -> endSegment(segment)) .flatMap(this::mapItem); }); } @Override - @Trace(dispatcher = true) + @WithSpan public Mono> getDatasetItemWorkspace(@NonNull Set datasetItemIds) { if (datasetItemIds.isEmpty()) { @@ -618,7 +619,7 @@ public Mono> getDatasetItemWorkspace(@NonNull Set delete(@NonNull List ids) { if (ids.isEmpty()) { return Mono.empty(); @@ -633,7 +634,7 @@ public Mono delete(@NonNull List ids) { return bindAndDelete(ids, statement) .flatMap(Result::getRowsUpdated) .reduce(0L, Long::sum) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); }); } @@ -645,7 +646,7 @@ private Flux bindAndDelete(List ids, Statement statement } @Override - @Trace(dispatcher = true) + @WithSpan public Mono getItems(@NonNull UUID datasetId, int page, int size) { Segment segmentCount = startSegment("dataset_items", "Clickhouse", "select_dataset_items_page_count"); @@ -656,7 +657,7 @@ public Mono getItems(@NonNull UUID datasetId, int page, int siz .bind("datasetId", datasetId) .bind("workspace_id", workspaceId) .execute()) - .doFinally(signalType -> segmentCount.end()) + .doFinally(signalType -> endSegment(segmentCount)) .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class))) .reduce(0L, Long::sum) .flatMap(count -> { @@ -672,7 +673,7 @@ public Mono getItems(@NonNull UUID datasetId, int page, int siz .flatMap(this::mapItem) .collectList() .flatMap(items -> Mono.just(new DatasetItemPage(items, page, items.size(), count))) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); }))); } @@ -705,7 +706,7 @@ private void bindSearchCriteria(DatasetItemSearchCriteria datasetItemSearchCrite } @Override - @Trace(dispatcher = true) + @WithSpan public Mono getItems( @NonNull DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size) { log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'", @@ -725,7 +726,7 @@ public Mono getItems( bindSearchCriteria(datasetItemSearchCriteria, statement); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segmentCount.end()) + .doFinally(signalType -> endSegment(segmentCount)) .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class))) .reduce(0L, Long::sum) .flatMap(count -> { @@ -744,7 +745,7 @@ public Mono getItems( bindSearchCriteria(datasetItemSearchCriteria, selectStatement); return makeFluxContextAware(bindWorkspaceIdToFlux(selectStatement)) - .doFinally(signalType -> segment.end()) + .doFinally(signalType -> endSegment(segment)) .flatMap(this::mapItem) .collectList() .flatMap(items -> Mono.just(new DatasetItemPage(items, page, items.size(), count))); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemService.java index e1a2ec04d2..d69a09c566 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemService.java @@ -9,7 +9,7 @@ import com.comet.opik.api.error.IdentifierMismatchException; import com.comet.opik.infrastructure.auth.RequestContext; import com.google.inject.ImplementedBy; -import com.newrelic.api.agent.Trace; +import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.ws.rs.ClientErrorException; @@ -57,7 +57,7 @@ class DatasetItemServiceImpl implements DatasetItemService { private final @NonNull SpanService spanService; @Override - @Trace(dispatcher = true) + @WithSpan public Mono save(@NonNull DatasetItemBatch batch) { if (batch.datasetId() == null && batch.datasetName() == null) { return Mono.error(failWithError("dataset_id or dataset_name must be provided")); @@ -102,14 +102,13 @@ private ClientErrorException newConflict(String error) { } @Override - @Trace(dispatcher = true) + @WithSpan public Mono get(@NonNull UUID id) { return dao.get(id) .switchIfEmpty(Mono.defer(() -> Mono.error(failWithNotFound("Dataset item not found")))); } - - @Override - @Trace(dispatcher = true) + + @WithSpan public Flux getItems(@NonNull String workspaceId, @NonNull DatasetItemStreamRequest request) { log.info("Getting dataset items by '{}' on workspaceId '{}'", request, workspaceId); return Mono.fromCallable(() -> datasetService.findByName(workspaceId, request.datasetName())) @@ -188,7 +187,7 @@ private NotFoundException failWithNotFound(String message) { } @Override - @Trace(dispatcher = true) + @WithSpan public Mono delete(@NonNull List ids) { if (ids.isEmpty()) { return Mono.empty(); @@ -198,13 +197,13 @@ public Mono delete(@NonNull List ids) { } @Override - @Trace(dispatcher = true) + @WithSpan public Mono getItems(@NonNull UUID datasetId, int page, int size) { return dao.getItems(datasetId, page, size); } @Override - @Trace(dispatcher = true) + @WithSpan public Mono getItems( int page, int size, @NonNull DatasetItemSearchCriteria datasetItemSearchCriteria) { log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'", diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentDAO.java index 2e24de9ffe..4be64b5fdd 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentDAO.java @@ -6,6 +6,7 @@ import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.Result; @@ -372,6 +373,7 @@ AND ilike(name, CONCAT('%', :name, '%')) private final @NonNull ConnectionFactory connectionFactory; + @WithSpan Mono insert(@NonNull Experiment experiment) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> insert(experiment, connection)) @@ -398,6 +400,7 @@ private String getOrDefault(JsonNode jsonNode) { return Optional.ofNullable(jsonNode).map(JsonNode::toString).orElse(""); } + @WithSpan Mono getById(@NonNull UUID id) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> getById(id, connection)) @@ -447,6 +450,7 @@ private static List getFeedbackScores(Row row) { return feedbackScoresAvg.isEmpty() ? null : feedbackScoresAvg; } + @WithSpan Mono find( int page, int size, @NonNull ExperimentSearchCriteria experimentSearchCriteria) { return countTotal(experimentSearchCriteria).flatMap(total -> find(page, size, experimentSearchCriteria, total)); @@ -507,6 +511,7 @@ private void bindSearchCriteria(Statement statement, ExperimentSearchCriteria cr } } + @WithSpan Flux findByName(String name) { Preconditions.checkArgument(StringUtils.isNotBlank(name), "Argument 'name' must not be blank"); return Mono.from(connectionFactory.create()) @@ -520,6 +525,7 @@ private Publisher findByName(String name, Connection connectio return makeFluxContextAware(bindWorkspaceIdToFlux(statement)); } + @WithSpan public Flux getExperimentWorkspaces(@NonNull Set experimentIds) { if (experimentIds.isEmpty()) { return Flux.empty(); @@ -535,6 +541,7 @@ public Flux getExperimentWorkspaces(@NonNull Set e row.get("id", UUID.class)))); } + @WithSpan public Mono delete(Set ids) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty"); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java index 2bf9d394d8..b456640e2a 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java @@ -2,6 +2,7 @@ import com.comet.opik.api.ExperimentItem; import com.google.common.base.Preconditions; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.Result; @@ -128,6 +129,7 @@ INSERT INTO experiment_items ( private final @NonNull ConnectionFactory connectionFactory; + @WithSpan public Flux findExperimentSummaryByDatasetIds(Collection datasetIds) { if (datasetIds.isEmpty()) { @@ -148,6 +150,7 @@ public Flux findExperimentSummaryByDatasetIds(Collection insert(@NonNull Set experimentItems) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(experimentItems), "Argument 'experimentItems' must not be empty"); @@ -207,6 +210,7 @@ private Publisher mapToExperimentItem(Result result) { .build()); } + @WithSpan public Mono get(@NonNull UUID id) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> get(id, connection)) @@ -251,6 +255,7 @@ private Publisher getItems( return makeFluxContextAware(bindWorkspaceIdToFlux(statement)); } + @WithSpan public Mono delete(Set ids) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty"); @@ -270,6 +275,7 @@ private Publisher delete(Set ids, Connection connection) return makeFluxContextAware(bindWorkspaceIdToFlux(statement)); } + @WithSpan public Mono deleteByExperimentIds(Set experimentIds) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(experimentIds), diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java index e63db2530f..44b605fbf1 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java @@ -5,6 +5,7 @@ import com.comet.opik.api.ScoreSource; import com.google.common.base.Preconditions; import com.google.inject.ImplementedBy; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Result; import io.r2dbc.spi.Row; @@ -256,6 +257,7 @@ AND entity_id IN ( """; @Override + @WithSpan public Mono>> getScores(@NonNull EntityType entityType, @NonNull List entityIds, @NonNull Connection connection) { @@ -309,6 +311,7 @@ private FeedbackScoreDto mapFeedback(Row row) { } @Override + @WithSpan public Mono scoreEntity(@NonNull EntityType entityType, @NonNull UUID entityId, @NonNull FeedbackScore score, @@ -343,6 +346,7 @@ public Mono scoreEntity(@NonNull EntityType entityType, } @Override + @WithSpan public Mono scoreBatchOf(@NonNull EntityType entityType, @NonNull List scores, @NonNull Connection connection) { @@ -401,6 +405,7 @@ public Mono scoreBatchOf(@NonNull EntityType entityType, } @Override + @WithSpan public Mono deleteScoreFrom(EntityType entityType, UUID id, String name, Connection connection) { var statement = connection.createStatement(DELETE_FEEDBACK_SCORE); @@ -415,12 +420,14 @@ public Mono deleteScoreFrom(EntityType entityType, UUID id, String name, C } @Override + @WithSpan public Mono deleteByEntityId( @NonNull EntityType entityType, @NonNull UUID entityId, @NonNull Connection connection) { return deleteByEntityIds(entityType, Set.of(entityId), connection); } @Override + @WithSpan public Mono deleteByEntityIds( @NonNull EntityType entityType, Set entityIds, @NonNull Connection connection) { Preconditions.checkArgument( diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java index c532dd4de1..c130c32453 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java @@ -8,8 +8,7 @@ import com.comet.opik.utils.JsonUtils; import com.comet.opik.utils.TemplateUtils; import com.google.common.base.Preconditions; -import com.newrelic.api.agent.Segment; -import com.newrelic.api.agent.Trace; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.Result; @@ -39,6 +38,7 @@ import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux; import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono; import static com.comet.opik.domain.FeedbackScoreDAO.EntityType; +import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.Segment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment; import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware; @@ -484,14 +484,14 @@ AND id in ( private final @NonNull FeedbackScoreDAO feedbackScoreDAO; private final @NonNull FilterQueryBuilder filterQueryBuilder; - @Trace(dispatcher = true) + @WithSpan public Mono insert(@NonNull Span span) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> insert(span, connection)) .then(); } - @Trace(dispatcher = true) + @WithSpan public Mono batchInsert(@NonNull List spans) { Preconditions.checkArgument(!spans.isEmpty(), "Spans list must not be empty"); @@ -620,7 +620,7 @@ private Publisher insert(Span span, Connection connection) { Segment segment = startSegment("spans", "Clickhouse", "insert"); return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } private ST newInsertTemplate(Span span) { @@ -630,7 +630,7 @@ private ST newInsertTemplate(Span span) { return template; } - @Trace(dispatcher = true) + @WithSpan public Mono update(@NonNull UUID id, @NonNull SpanUpdate spanUpdate) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> update(id, spanUpdate, connection)) @@ -638,7 +638,7 @@ public Mono update(@NonNull UUID id, @NonNull SpanUpdate spanUpdate) { .reduce(0L, Long::sum); } - @Trace(dispatcher = true) + @WithSpan public Mono partialInsert(@NonNull UUID id, @NonNull UUID projectId, @NonNull SpanUpdate spanUpdate) { return Mono.from(connectionFactory.create()) .flatMapMany(connection -> { @@ -661,7 +661,7 @@ public Mono partialInsert(@NonNull UUID id, @NonNull UUID projectId, @NonN Segment segment = startSegment("spans", "Clickhouse", "partial_insert"); return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); }) .flatMap(Result::getRowsUpdated) .reduce(0L, Long::sum); @@ -676,7 +676,7 @@ private Publisher update(UUID id, SpanUpdate spanUpdate, Conne Segment segment = startSegment("spans", "Clickhouse", "update"); return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } private void bindUpdateParams(SpanUpdate spanUpdate, Statement statement) { @@ -721,7 +721,7 @@ private ST newUpdateTemplate(SpanUpdate spanUpdate, String sql) { return template; } - @Trace(dispatcher = true) + @WithSpan public Mono getById(@NonNull UUID id) { log.info("Getting span by id '{}'", id); return Mono.from(connectionFactory.create()) @@ -740,15 +740,15 @@ private Publisher getById(UUID id, Connection connection) { Segment segment = startSegment("spans", "Clickhouse", "get_by_id"); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } - @Trace(dispatcher = true) + @WithSpan public Mono deleteByTraceId(@NonNull UUID traceId, @NonNull Connection connection) { return deleteByTraceIds(Set.of(traceId), connection); } - @Trace(dispatcher = true) + @WithSpan public Mono deleteByTraceIds(Set traceIds, @NonNull Connection connection) { Preconditions.checkArgument( CollectionUtils.isNotEmpty(traceIds), "Argument 'traceIds' must not be empty"); @@ -757,7 +757,7 @@ public Mono deleteByTraceIds(Set traceIds, @NonNull Connection conne .bind("trace_ids", traceIds); var segment = startSegment("spans", "Clickhouse", "delete_by_trace_id"); return makeMonoContextAware(bindWorkspaceIdToMono(statement)) - .doFinally(signalType -> segment.end()) + .doFinally(signalType -> endSegment(segment)) .then(); } @@ -800,7 +800,7 @@ private Publisher mapToDto(Result result) { }); } - @Trace(dispatcher = true) + @WithSpan public Mono find(int page, int size, @NonNull SpanSearchCriteria spanSearchCriteria) { log.info("Finding span by '{}'", spanSearchCriteria); return countTotal(spanSearchCriteria).flatMap(total -> find(page, size, spanSearchCriteria, total)); @@ -825,7 +825,7 @@ private Mono> enhanceWithFeedbackScores(List spans, Connection .map(scoresMap -> spans.stream() .map(span -> span.toBuilder().feedbackScores(scoresMap.get(span.id())).build()) .toList()) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } private Publisher find(int page, int size, SpanSearchCriteria spanSearchCriteria, @@ -842,7 +842,7 @@ private Publisher find(int page, int size, SpanSearchCriteria Segment segment = startSegment("spans", "Clickhouse", "find"); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } private Mono countTotal(SpanSearchCriteria spanSearchCriteria) { @@ -862,7 +862,7 @@ private Publisher countTotal(SpanSearchCriteria spanSearchCrit Segment segment = startSegment("spans", "Clickhouse", "count_total"); return makeFluxContextAware(bindWorkspaceIdToFlux(statement)) - .doFinally(signalType -> segment.end()); + .doFinally(signalType -> endSegment(segment)); } private ST newFindTemplate(String query, SpanSearchCriteria spanSearchCriteria) { @@ -893,7 +893,7 @@ private void bindSearchCriteria(Statement statement, SpanSearchCriteria spanSear }); } - @Trace(dispatcher = true) + @WithSpan public Mono> getSpanWorkspace(@NonNull Set spanIds) { if (spanIds.isEmpty()) { return Mono.just(List.of()); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java index 648708b59d..1c2f43771b 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java @@ -13,7 +13,7 @@ import com.comet.opik.infrastructure.lock.LockService; import com.comet.opik.utils.WorkspaceUtils; import com.google.common.base.Preconditions; -import com.newrelic.api.agent.Trace; +import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.ws.rs.NotFoundException; @@ -50,7 +50,7 @@ public class SpanService { private final @NonNull IdGenerator idGenerator; private final @NonNull LockService lockService; - @Trace(dispatcher = true) + @WithSpan public Mono find(int page, int size, @NonNull SpanSearchCriteria searchCriteria) { log.info("Finding span by '{}'", searchCriteria); @@ -75,13 +75,13 @@ private Mono> findProject(SpanSearchCriteria searchCriteria) { }); } - @Trace(dispatcher = true) + @WithSpan public Mono getById(@NonNull UUID id) { log.info("Getting span by id '{}'", id); return spanDAO.getById(id).switchIfEmpty(Mono.defer(() -> Mono.error(newNotFoundException(id)))); } - @Trace(dispatcher = true) + @WithSpan public Mono create(@NonNull Span span) { var id = span.id() == null ? idGenerator.generateId() : span.id(); var projectName = WorkspaceUtils.getProjectName(span.projectName()); @@ -158,7 +158,7 @@ private Mono handleProjectCreationError(Throwable exception, String pro }; } - @Trace(dispatcher = true) + @WithSpan public Mono update(@NonNull UUID id, @NonNull SpanUpdate spanUpdate) { log.info("Updating span with id '{}'", id); @@ -254,7 +254,7 @@ public Mono validateSpanWorkspace(@NonNull String workspaceId, @NonNull .map(spanWorkspace -> spanWorkspace.stream().allMatch(span -> workspaceId.equals(span.workspaceId()))); } - @Trace(dispatcher = true) + @WithSpan public Mono create(@NonNull SpanBatch batch) { Preconditions.checkArgument(!batch.spans().isEmpty(), "Batch spans must not be empty"); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index 86b934ab21..42f23ccd76 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -10,7 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import com.google.inject.ImplementedBy; -import com.newrelic.api.agent.Segment; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Result; import io.r2dbc.spi.Statement; @@ -41,6 +41,7 @@ import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux; import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono; import static com.comet.opik.domain.FeedbackScoreDAO.EntityType; +import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.Segment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment; import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment; import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware; @@ -513,7 +514,7 @@ LEFT JOIN ( private final @NonNull FilterQueryBuilder filterQueryBuilder; @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono insert(@NonNull Trace trace, @NonNull Connection connection) { ST template = buildInsertTemplate(trace); @@ -576,7 +577,7 @@ private ST buildInsertTemplate(Trace trace) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id, @NonNull Connection connection) { return update(id, traceUpdate, connection).then(); } @@ -653,12 +654,13 @@ private Flux getById(UUID id, Connection connection) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono delete(@NonNull UUID id, @NonNull Connection connection) { return delete(Set.of(id), connection); } @Override + @WithSpan public Mono delete(Set ids, @NonNull Connection connection) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty"); log.info("Deleting traces, count '{}'", ids.size()); @@ -671,7 +673,7 @@ public Mono delete(Set ids, @NonNull Connection connection) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono findById(@NonNull UUID id, @NonNull Connection connection) { return getById(id, connection) .flatMap(this::mapToDto) @@ -712,7 +714,7 @@ private Publisher mapToDto(Result result) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono find( int size, int page, @NonNull TraceSearchCriteria traceSearchCriteria, @NonNull Connection connection) { return countTotal(traceSearchCriteria, connection) @@ -725,7 +727,7 @@ public Mono find( } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono partialInsert( @NonNull UUID projectId, @NonNull TraceUpdate traceUpdate, @@ -813,7 +815,7 @@ private void bindSearchCriteria(TraceSearchCriteria traceSearchCriteria, Stateme } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono> getTraceWorkspace( @NonNull Set traceIds, @NonNull Connection connection) { @@ -835,6 +837,7 @@ public Mono> getTraceWorkspace( } @Override + @WithSpan public Mono batchInsert(@NonNull List traces, @NonNull Connection connection) { Preconditions.checkArgument(!traces.isEmpty(), "traces must not be empty"); @@ -891,7 +894,7 @@ private String getOrDefault(JsonNode value) { return value != null ? value.toString() : ""; } - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Flux countTracesPerWorkspace(Connection connection) { var statement = connection.createStatement(TRACE_COUNT_BY_WORKSPACE_ID); @@ -903,6 +906,7 @@ public Flux countTracesPerWorkspace(Connection connection) } @Override + @WithSpan public Mono> getLastUpdatedTraceAt( @NonNull Set projectIds, @NonNull String workspaceId, @NonNull Connection connection) { diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index e864530bc0..404cf249eb 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -17,6 +17,7 @@ import com.comet.opik.utils.WorkspaceUtils; import com.google.common.base.Preconditions; import com.google.inject.ImplementedBy; +import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.ws.rs.NotFoundException; @@ -79,7 +80,7 @@ class TraceServiceImpl implements TraceService { private final @NonNull LockService lockService; @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono create(@NonNull Trace trace) { String projectName = WorkspaceUtils.getProjectName(trace.projectName()); @@ -93,7 +94,7 @@ public Mono create(@NonNull Trace trace) { Mono.defer(() -> insertTrace(trace, project, id)))); } - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono create(TraceBatch batch) { Preconditions.checkArgument(!batch.traces().isEmpty(), "Batch traces cannot be empty"); @@ -211,7 +212,7 @@ private Mono handleProjectCreationError(Throwable exception, String pro } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id) { var projectName = WorkspaceUtils.getProjectName(traceUpdate.projectName()); @@ -265,14 +266,14 @@ private NotFoundException failWithNotFound(String error) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono get(@NonNull UUID id) { return template.nonTransaction(connection -> dao.findById(id, connection)) .switchIfEmpty(Mono.defer(() -> Mono.error(failWithNotFound("Trace not found")))); } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono delete(@NonNull UUID id) { log.info("Deleting trace by id '{}'", id); return lockService.executeWithLock( @@ -286,7 +287,7 @@ public Mono delete(@NonNull UUID id) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono delete(Set ids) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(ids), "Argument 'ids' must not be empty"); log.info("Deleting traces, count '{}'", ids.size()); @@ -298,7 +299,7 @@ public Mono delete(Set ids) { } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono find(int page, int size, @NonNull TraceSearchCriteria criteria) { if (criteria.projectId() != null) { @@ -312,7 +313,7 @@ public Mono find(int page, int size, @NonNull TraceSearchCriter } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono validateTraceWorkspace(@NonNull String workspaceId, @NonNull Set traceIds) { if (traceIds.isEmpty()) { return Mono.just(true); @@ -324,7 +325,7 @@ public Mono validateTraceWorkspace(@NonNull String workspaceId, @NonNul } @Override - @com.newrelic.api.agent.Trace(dispatcher = true) + @WithSpan public Mono countTracesPerWorkspace() { return template.stream(dao::countTracesPerWorkspace) .collectList() diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java index f50c65ecf6..6edfae177c 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java @@ -3,7 +3,11 @@ import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.infrastructure.OpikConfiguration; import com.google.inject.Provides; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.r2dbc.v1_0.R2dbcTelemetry; import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryOptions; import jakarta.inject.Named; import jakarta.inject.Singleton; import ru.vyarus.dropwizard.guice.module.support.DropwizardAwareModule; @@ -22,7 +26,8 @@ protected void configure() { @Provides @Singleton public ConnectionFactory getConnectionFactory() { - return connectionFactory; + return R2dbcTelemetry.create(GlobalOpenTelemetry.get()) + .wrapConnectionFactory(connectionFactory, ConnectionFactoryOptions.builder().build()); } @Provides diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java index 0ba6e20572..c0d0007dbb 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/instrumentation/InstrumentAsyncUtils.java @@ -1,8 +1,10 @@ package com.comet.opik.infrastructure.instrumentation; -import com.newrelic.api.agent.DatastoreParameters; -import com.newrelic.api.agent.NewRelic; -import com.newrelic.api.agent.Segment; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import reactor.core.scheduler.Schedulers; @@ -11,17 +13,20 @@ @UtilityClass public class InstrumentAsyncUtils { + public record Segment(Scope scope, Span span) {} + public static Segment startSegment(String segmentName, String product, String operationName) { - Segment segment = NewRelic.getAgent().getTransaction().startSegment(segmentName); + Tracer tracer = GlobalOpenTelemetry.get().getTracer("com.comet.opik"); - segment.reportAsExternal(DatastoreParameters - .product(product) - .collection(null) - .operation(operationName) - .build()); + Span span = tracer + .spanBuilder("custom-reactive-%s".formatted(segmentName)) + .setParent(Context.current().with(Span.current())) + .startSpan() + .setAttribute("product", product) + .setAttribute("operation", operationName); - return segment; + return new Segment(span.makeCurrent(), span); } public static void endSegment(Segment segment) { @@ -30,7 +35,8 @@ public static void endSegment(Segment segment) { Schedulers.boundedElastic().schedule(() -> { try { // End the segment - segment.end(); + segment.scope().close(); + segment.span().end(); } catch (Exception e) { log.warn("Failed to end segment", e); } diff --git a/deployment/docker-compose/docker-compose.yaml b/deployment/docker-compose/docker-compose.yaml index bbe9b65016..cd5da6f4b1 100644 --- a/deployment/docker-compose/docker-compose.yaml +++ b/deployment/docker-compose/docker-compose.yaml @@ -85,6 +85,13 @@ services: JAVA_OPTS: "-Dliquibase.propertySubstitutionEnabled=true" REDIS_URL: redis://:opik@redis:6379/ ANALYTICS_DB_PASS: opik + OPIK_OTEL_SDK_ENABLED: false + OTEL_VERSION: 2.8.0 + OTEL_PROPAGATORS: "tracecontext,baggage,b3" + OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED: true + OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION: BASE2_EXPONENTIAL_BUCKET_HISTOGRAM + OTEL_EXPERIMENTAL_RESOURCE_DISABLED_KEYS: process.command_args + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: delta OPIK_USAGE_REPORT_ENABLED: ${OPIK_USAGE_REPORT_ENABLED:-true} ports: - "8080" diff --git a/deployment/helm_chart/opik/README.md b/deployment/helm_chart/opik/README.md index 0d8ab6e0c3..dde8d09a49 100644 --- a/deployment/helm_chart/opik/README.md +++ b/deployment/helm_chart/opik/README.md @@ -109,6 +109,13 @@ Call opik api on http://localhost:5173/api | component.backend.env.ANALYTICS_DB_PROTOCOL | string | `"HTTP"` | | | component.backend.env.ANALYTICS_DB_USERNAME | string | `"opik"` | | | component.backend.env.JAVA_OPTS | string | `"-Dliquibase.propertySubstitutionEnabled=true"` | | +| component.backend.env.OPIK_OTEL_SDK_ENABLED | bool | `false` | | +| component.backend.env.OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED | bool | `true` | | +| component.backend.env.OTEL_EXPERIMENTAL_RESOURCE_DISABLED_KEYS | string | `"process.command_args"` | | +| component.backend.env.OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION | string | `"BASE2_EXPONENTIAL_BUCKET_HISTOGRAM"` | | +| component.backend.env.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE | string | `"delta"` | | +| component.backend.env.OTEL_PROPAGATORS | string | `"tracecontext,baggage,b3"` | | +| component.backend.env.OTEL_VERSION | string | `"2.8.0"` | | | component.backend.env.REDIS_URL | string | `"redis://:wFSuJX9nDBdCa25sKZG7bh@opik-redis-master:6379/"` | | | component.backend.env.STATE_DB_DATABASE_NAME | string | `"opik"` | | | component.backend.env.STATE_DB_PASS | string | `"opik"` | | diff --git a/deployment/helm_chart/opik/values.yaml b/deployment/helm_chart/opik/values.yaml index 4278091ddf..8b741a659a 100644 --- a/deployment/helm_chart/opik/values.yaml +++ b/deployment/helm_chart/opik/values.yaml @@ -56,7 +56,13 @@ component: ANALYTICS_DB_MIGRATIONS_PASS: opik ANALYTICS_DB_PASS: opik STATE_DB_PASS: opik - + OPIK_OTEL_SDK_ENABLED: false + OTEL_VERSION: 2.8.0 + OTEL_PROPAGATORS: "tracecontext,baggage,b3" + OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED: true + OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION: BASE2_EXPONENTIAL_BUCKET_HISTOGRAM + OTEL_EXPERIMENTAL_RESOURCE_DISABLED_KEYS: process.command_args + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: delta envFrom: - configMapRef: name: opik-backend