Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPIK-529] Add error_info to spans and traces #885

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotBlank;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ErrorInfo(
@NotBlank String exceptionType,
String message,
@NotBlank String traceback) {
public static final TypeReference<ErrorInfo> ERROR_INFO_TYPE = new TypeReference<>() {
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public record Span(
@JsonView({Span.View.Public.class, Span.View.Write.class}) String provider,
@JsonView({Span.View.Public.class, Span.View.Write.class}) Set<String> tags,
@JsonView({Span.View.Public.class, Span.View.Write.class}) Map<String, Integer> usage,
@JsonView({Span.View.Public.class, Span.View.Write.class}) ErrorInfo errorInfo,
@JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant createdAt,
@JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant lastUpdatedAt,
@JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String createdBy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ public record SpanUpdate(
String model,
String provider,
Set<String> tags,
Map<String, Integer> usage) {
Map<String, Integer> usage,
ErrorInfo errorInfo) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public record Trace(
@JsonView({Trace.View.Public.class, Trace.View.Write.class}) JsonNode output,
@JsonView({Trace.View.Public.class, Trace.View.Write.class}) JsonNode metadata,
@JsonView({Trace.View.Public.class, Trace.View.Write.class}) Set<String> tags,
@JsonView({Trace.View.Public.class, Trace.View.Write.class}) ErrorInfo errorInfo,
@JsonView({Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Map<String, Long> usage,
@JsonView({Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant createdAt,
@JsonView({Trace.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant lastUpdatedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public record TraceUpdate(
JsonNode input,
JsonNode output,
JsonNode metadata,
Set<String> tags) {
Set<String> tags,
ErrorInfo errorInfo) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ public Response update(@PathParam("id") UUID id,

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Updating span with id '{}' on workspaceId '{}'", id, workspaceId);
log.info("Updating trace with id '{}' on workspaceId '{}'", id, workspaceId);

service.update(trace, id)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Updated span with id '{}' on workspaceId '{}'", id, workspaceId);
log.info("Updated trace with id '{}' on workspaceId '{}'", id, workspaceId);

return Response.noContent().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.comet.opik.api.ErrorInfo.ERROR_INFO_TYPE;
import static com.comet.opik.domain.AsyncContextUtils.bindUserNameAndWorkspaceContextToStream;
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToMono;
Expand Down Expand Up @@ -77,6 +78,7 @@ INSERT INTO spans(
total_estimated_cost_version,
tags,
usage,
error_info,
created_by,
last_updated_by
) VALUES
Expand All @@ -100,6 +102,7 @@ INSERT INTO spans(
:total_estimated_cost_version<item.index>,
:tags<item.index>,
mapFromArrays(:usage_keys<item.index>, :usage_values<item.index>),
:error_info<item.index>,
:created_by<item.index>,
:last_updated_by<item.index>
)
Expand Down Expand Up @@ -134,6 +137,7 @@ INSERT INTO spans(
total_estimated_cost_version,
tags,
usage,
error_info,
created_at,
created_by,
last_updated_by
Expand Down Expand Up @@ -212,6 +216,10 @@ INSERT INTO spans(
notEmpty(mapKeys(old_span.usage)), old_span.usage,
new_span.usage
) as usage,
multiIf(
LENGTH(old_span.error_info) > 0, old_span.error_info,
new_span.error_info
) as error_info,
multiIf(
notEquals(old_span.created_at, toDateTime64('1970-01-01 00:00:00.000', 9)) AND old_span.created_at >= toDateTime64('1970-01-01 00:00:00.000', 9), old_span.created_at,
new_span.created_at
Expand Down Expand Up @@ -241,6 +249,7 @@ INSERT INTO spans(
:total_estimated_cost_version as total_estimated_cost_version,
:tags as tags,
mapFromArrays(:usage_keys, :usage_values) as usage,
:error_info as error_info,
now64(9) as created_at,
:user_name as created_by,
:user_name as last_updated_by
Expand Down Expand Up @@ -281,6 +290,7 @@ INSERT INTO spans (
total_estimated_cost_version,
tags,
usage,
error_info,
created_at,
created_by,
last_updated_by
Expand All @@ -303,6 +313,7 @@ INSERT INTO spans (
<if(total_estimated_cost_version)> :total_estimated_cost_version <else> total_estimated_cost_version <endif> as total_estimated_cost_version,
<if(tags)> :tags <else> tags <endif> as tags,
<if(usage)> CAST((:usageKeys, :usageValues), 'Map(String, Int64)') <else> usage <endif> as usage,
<if(error_info)> :error_info <else> error_info <endif> as error_info,
created_at,
created_by,
:user_name as last_updated_by
Expand All @@ -327,7 +338,7 @@ INSERT INTO spans (
private static final String PARTIAL_INSERT = """
INSERT INTO spans(
id, project_id, workspace_id, trace_id, parent_span_id, name, type,
start_time, end_time, input, output, metadata, model, provider, total_estimated_cost, total_estimated_cost_version, tags, usage, created_at,
start_time, end_time, input, output, metadata, model, provider, total_estimated_cost, total_estimated_cost_version, tags, usage, error_info, created_at,
created_by, last_updated_by
)
SELECT
Expand Down Expand Up @@ -414,6 +425,11 @@ INSERT INTO spans(
notEmpty(mapKeys(old_span.usage)), old_span.usage,
new_span.usage
) as usage,
multiIf(
LENGTH(new_span.error_info) > 0, new_span.error_info,
LENGTH(old_span.error_info) > 0, old_span.error_info,
new_span.error_info
) as error_info,
multiIf(
notEquals(old_span.created_at, toDateTime64('1970-01-01 00:00:00.000', 9)) AND old_span.created_at >= toDateTime64('1970-01-01 00:00:00.000', 9), old_span.created_at,
new_span.created_at
Expand Down Expand Up @@ -443,6 +459,7 @@ INSERT INTO spans(
<if(total_estimated_cost_version)> :total_estimated_cost_version <else> '' <endif> as total_estimated_cost_version,
<if(tags)> :tags <else> [] <endif> as tags,
<if(usage)> CAST((:usageKeys, :usageValues), 'Map(String, Int64)') <else> mapFromArrays([], []) <endif> as usage,
<if(error_info)> :error_info <else> '' <endif> as error_info,
now64(9) as created_at,
:user_name as created_by,
:user_name as last_updated_by
Expand Down Expand Up @@ -490,6 +507,7 @@ LEFT JOIN (
total_estimated_cost,
tags,
usage,
error_info,
created_at,
last_updated_at,
created_by,
Expand Down Expand Up @@ -732,6 +750,8 @@ private Publisher<? extends Result> insert(List<Span> spans, Connection connecti
.bind("total_estimated_cost_version" + i,
estimatedCost.compareTo(BigDecimal.ZERO) > 0 ? ESTIMATED_COST_VERSION : "")
.bind("tags" + i, span.tags() != null ? span.tags().toArray(String[]::new) : new String[]{})
.bind("error_info" + i,
span.errorInfo() != null ? JsonUtils.readTree(span.errorInfo()).toString() : "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you need the readTree call here in the write path. It should be enough to just write the errorInfo record.

There are multiple occurrences of this below in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorInfo is a record with fixed fields, it's not a JsonNode, so if I write it without JsonUtils, it will be something like this in DB:

ErrorInfo[exceptionType=V_7ivWNh_f, message=zXvv1lMJwe, traceback=M3ZVOqxH0Y]

Then it would be impossible to deserialize and all Clickhouse Json related functions won't work as well.

I chose record Object instead of JsonNode since it will have fixed fileds, not arbitrary Json as metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrescrz we could use here JsonUtils.writeValueAsString instead if you prefer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it twice, I think the most similar approach to other existing Json fields is to convert it to a JsonNode as you're doing. So this is fine.

.bind("created_by" + i, userName)
.bind("last_updated_by" + i, userName);

Expand Down Expand Up @@ -846,6 +866,12 @@ private Publisher<? extends Result> insert(Span span, Connection connection) {
statement.bind("usage_values", new Integer[]{});
}

if (span.errorInfo() != null) {
statement.bind("error_info", JsonUtils.readTree(span.errorInfo()).toString());
} else {
statement.bind("error_info", "");
}

Segment segment = startSegment("spans", "Clickhouse", "insert");

return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement))
Expand Down Expand Up @@ -944,6 +970,8 @@ private void bindUpdateParams(SpanUpdate spanUpdate, Statement statement) {
.ifPresent(model -> statement.bind("model", model));
Optional.ofNullable(spanUpdate.provider())
.ifPresent(provider -> statement.bind("provider", provider));
Optional.ofNullable(spanUpdate.errorInfo())
.ifPresent(errorInfo -> statement.bind("error_info", JsonUtils.readTree(errorInfo).toString()));

if (StringUtils.isNotBlank(spanUpdate.model()) && Objects.nonNull(spanUpdate.usage())) {
statement.bind("total_estimated_cost",
Expand All @@ -970,6 +998,8 @@ private ST newUpdateTemplate(SpanUpdate spanUpdate, String sql) {
.ifPresent(endTime -> template.add("end_time", endTime.toString()));
Optional.ofNullable(spanUpdate.usage())
.ifPresent(usage -> template.add("usage", usage.toString()));
Optional.ofNullable(spanUpdate.errorInfo())
.ifPresent(errorInfo -> template.add("error_info", JsonUtils.readTree(errorInfo).toString()));
if (StringUtils.isNotBlank(spanUpdate.model()) && Objects.nonNull(spanUpdate.usage())) {
template.add("total_estimated_cost", "total_estimated_cost");
template.add("total_estimated_cost_version", "total_estimated_cost_version");
Expand Down Expand Up @@ -1056,6 +1086,10 @@ private Publisher<Span> mapToDto(Result result) {
.filter(set -> !set.isEmpty())
.orElse(null))
.usage(row.get("usage", Map.class))
.errorInfo(Optional.ofNullable(row.get("error_info", String.class))
.filter(str -> !str.isBlank())
.map(errorInfo -> JsonUtils.readValue(errorInfo, ERROR_INFO_TYPE))
.orElse(null))
.createdAt(row.get("created_at", Instant.class))
.lastUpdatedAt(row.get("last_updated_at", Instant.class))
.createdBy(row.get("created_by", String.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static com.comet.opik.api.ErrorInfo.ERROR_INFO_TYPE;
import static com.comet.opik.api.Trace.TracePage;
import static com.comet.opik.api.TraceCountResponse.WorkspaceTraceCount;
import static com.comet.opik.domain.AsyncContextUtils.bindUserNameAndWorkspaceContext;
Expand Down Expand Up @@ -104,6 +105,7 @@ INSERT INTO traces(
output,
metadata,
tags,
error_info,
created_by,
last_updated_by
) VALUES
Expand All @@ -119,6 +121,7 @@ INSERT INTO traces(
:output<item.index>,
:metadata<item.index>,
:tags<item.index>,
:error_info<item.index>,
:user_name,
:user_name
)
Expand All @@ -145,6 +148,7 @@ INSERT INTO traces(
output,
metadata,
tags,
error_info,
created_at,
created_by,
last_updated_by
Expand Down Expand Up @@ -189,6 +193,10 @@ INSERT INTO traces(
notEmpty(old_trace.tags), old_trace.tags,
new_trace.tags
) as tags,
multiIf(
LENGTH(old_trace.error_info) > 0, old_trace.error_info,
new_trace.error_info
) as error_info,
multiIf(
notEquals(old_trace.created_at, toDateTime64('1970-01-01 00:00:00.000', 9)) AND old_trace.created_at >= toDateTime64('1970-01-01 00:00:00.000', 9), old_trace.created_at,
new_trace.created_at
Expand All @@ -210,6 +218,7 @@ INSERT INTO traces(
:output as output,
:metadata as metadata,
:tags as tags,
:error_info as error_info,
now64(9) as created_at,
:user_name as created_by,
:user_name as last_updated_by
Expand All @@ -231,7 +240,7 @@ LEFT JOIN (
***/
private static final String UPDATE = """
INSERT INTO traces (
id, project_id, workspace_id, name, start_time, end_time, input, output, metadata, tags, created_at, created_by, last_updated_by
id, project_id, workspace_id, name, start_time, end_time, input, output, metadata, tags, error_info, created_at, created_by, last_updated_by
) SELECT
id,
project_id,
Expand All @@ -243,6 +252,7 @@ INSERT INTO traces (
<if(output)> :output <else> output <endif> as output,
<if(metadata)> :metadata <else> metadata <endif> as metadata,
<if(tags)> :tags <else> tags <endif> as tags,
<if(error_info)> :error_info <else> error_info <endif> as error_info,
created_at,
created_by,
:user_name as last_updated_by
Expand Down Expand Up @@ -302,6 +312,7 @@ LEFT JOIN (
<if(truncate)> replaceRegexpAll(output, '<truncate>', '"[image]"') as output <else> output <endif>,
<if(truncate)> replaceRegexpAll(metadata, '<truncate>', '"[image]"') as metadata <else> metadata <endif>,
tags,
error_info,
created_at,
last_updated_at,
created_by,
Expand Down Expand Up @@ -457,7 +468,7 @@ LEFT JOIN (
//TODO: refactor to implement proper conflict resolution
private static final String INSERT_UPDATE = """
INSERT INTO traces (
id, project_id, workspace_id, name, start_time, end_time, input, output, metadata, tags, created_at, created_by, last_updated_by
id, project_id, workspace_id, name, start_time, end_time, input, output, metadata, tags, error_info, created_at, created_by, last_updated_by
)
SELECT
new_trace.id as id,
Expand Down Expand Up @@ -504,6 +515,11 @@ INSERT INTO traces (
notEmpty(old_trace.tags), old_trace.tags,
new_trace.tags
) as tags,
multiIf(
LENGTH(new_trace.error_info) > 0, new_trace.error_info,
LENGTH(old_trace.error_info) > 0, old_trace.error_info,
new_trace.error_info
) as error_info,
multiIf(
notEquals(old_trace.created_at, toDateTime64('1970-01-01 00:00:00.000', 9)) AND old_trace.created_at >= toDateTime64('1970-01-01 00:00:00.000', 9), old_trace.created_at,
new_trace.created_at
Expand All @@ -525,6 +541,7 @@ INSERT INTO traces (
<if(output)> :output <else> '' <endif> as output,
<if(metadata)> :metadata <else> '' <endif> as metadata,
<if(tags)> :tags <else> [] <endif> as tags,
<if(error_info)> :error_info <else> '' <endif> as error_info,
now64(9) as created_at,
:user_name as created_by,
:user_name as last_updated_by
Expand Down Expand Up @@ -752,6 +769,12 @@ private Statement buildInsertStatement(Trace trace, Connection connection, ST te
statement.bind("tags", new String[]{});
}

if (trace.errorInfo() != null) {
statement.bind("error_info", JsonUtils.readTree(trace.errorInfo()).toString());
} else {
statement.bind("error_info", "");
}

return statement;
}

Expand Down Expand Up @@ -806,6 +829,9 @@ private void bindUpdateParams(TraceUpdate traceUpdate, Statement statement) {
Optional.ofNullable(traceUpdate.metadata())
.ifPresent(metadata -> statement.bind("metadata", metadata.toString()));

Optional.ofNullable(traceUpdate.errorInfo())
.ifPresent(errorInfo -> statement.bind("error_info", JsonUtils.readTree(errorInfo).toString()));

Optional.ofNullable(traceUpdate.endTime())
.ifPresent(endTime -> statement.bind("end_time", endTime.toString()));
}
Expand All @@ -828,6 +854,9 @@ private ST buildUpdateTemplate(TraceUpdate traceUpdate, String update) {
Optional.ofNullable(traceUpdate.endTime())
.ifPresent(endTime -> template.add("end_time", endTime.toString()));

Optional.ofNullable(traceUpdate.errorInfo())
.ifPresent(errorInfo -> template.add("error_info", JsonUtils.readTree(errorInfo).toString()));

return template;
}

Expand Down Expand Up @@ -897,6 +926,10 @@ private Publisher<Trace> mapToDto(Result result) {
.totalEstimatedCost(row.get("total_estimated_cost", BigDecimal.class).compareTo(BigDecimal.ZERO) == 0
? null
: row.get("total_estimated_cost", BigDecimal.class))
.errorInfo(Optional.ofNullable(row.get("error_info", String.class))
.filter(str -> !str.isBlank())
.map(errorInfo -> JsonUtils.readValue(errorInfo, ERROR_INFO_TYPE))
.orElse(null))
.createdAt(row.get("created_at", Instant.class))
.lastUpdatedAt(row.get("last_updated_at", Instant.class))
.createdBy(row.get("created_by", String.class))
Expand Down Expand Up @@ -1060,7 +1093,9 @@ private Publisher<? extends Result> insert(List<Trace> traces, Connection connec
.bind("input" + i, getOrDefault(trace.input()))
.bind("output" + i, getOrDefault(trace.output()))
.bind("metadata" + i, getOrDefault(trace.metadata()))
.bind("tags" + i, trace.tags() != null ? trace.tags().toArray(String[]::new) : new String[]{});
.bind("tags" + i, trace.tags() != null ? trace.tags().toArray(String[]::new) : new String[]{})
.bind("error_info" + i,
trace.errorInfo() != null ? JsonUtils.readTree(trace.errorInfo()).toString() : "");

if (trace.endTime() != null) {
statement.bind("end_time" + i, trace.endTime().toString());
Expand Down
Loading
Loading