Skip to content

Commit

Permalink
Add code review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Sep 12, 2024
1 parent 3b68fd1 commit e03b560
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 74 deletions.
15 changes: 15 additions & 0 deletions apps/opik-backend/src/main/java/com/comet/opik/api/Span.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -65,4 +66,18 @@ public static class Write {
public static class Public {
}
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Span span = (Span) o;
return Objects.equals(id, span.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@

import java.util.List;

public record SpanBatch(@NotNull @Size(min = 1, max = 2000) @JsonView( {
public record SpanBulk(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Span.View.Write.class}) @Valid List<Span> spans){
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.FeedbackScoreBatch;
import com.comet.opik.api.Span;
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.SpanBulk;
import com.comet.opik.api.SpanSearchCriteria;
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.filter.FiltersFactory;
Expand Down Expand Up @@ -134,11 +134,11 @@ public Response create(
}

@POST
@Path("/batch")
@Path("/bulk")
@Operation(operationId = "createSpans", summary = "Create spans", description = "Create spans", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
public Response createSpans(
@RequestBody(content = @Content(schema = @Schema(implementation = SpanBatch.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBatch spans) {
@RequestBody(content = @Content(schema = @Schema(implementation = SpanBulk.class))) @JsonView(Span.View.Write.class) @NotNull @Valid SpanBulk spans) {

spanService.create(spans)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
Expand Down
65 changes: 21 additions & 44 deletions apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.comet.opik.domain.filter.FilterStrategy;
import com.comet.opik.utils.JsonUtils;
import com.comet.opik.utils.TemplateUtils;
import com.google.common.base.Preconditions;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.Trace;
import io.r2dbc.spi.Connection;
Expand All @@ -25,7 +26,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,7 +49,7 @@
@Slf4j
class SpanDAO {

private static final String PLAIN_INSERT = """
private static final String BULK_INSERT = """
INSERT INTO spans(
id,
project_id,
Expand Down Expand Up @@ -494,27 +494,25 @@ public Mono<Void> insert(@NonNull Span span) {
}

@Trace(dispatcher = true)
public Mono<Void> batchInsert(@NonNull List<Span> spans) {
return Mono.from(connectionFactory.create())
.flatMapMany(connection -> {
if (spans.isEmpty()) {
return Mono.just(0L);
}
public Mono<Long> bulkInsert(@NonNull List<Span> spans) {

return insert(spans, connection);
})
.then();
Preconditions.checkArgument(!spans.isEmpty(), "Spans list must not be empty");

return Mono.from(connectionFactory.create())
.flatMapMany(connection -> insert(spans, connection))
.flatMap(Result::getRowsUpdated)
.reduce(0L, Long::sum);
}

private Publisher<? extends Result> insert(Collection<Span> spans, Connection connection) {
private Publisher<? extends Result> insert(List<Span> spans, Connection connection) {

return makeMonoContextAware((userName, workspaceName, workspaceId) -> {
List<TemplateUtils.QueryItem> queryItems = getQueryItemPlaceHolder(spans.size());

var template = new ST(PLAIN_INSERT)
var template = new ST(BULK_INSERT)
.add("items", queryItems);

var statement = connection.createStatement(template.render());
Statement statement = connection.createStatement(template.render());

int i = 0;
for (Span span : spans) {
Expand All @@ -524,40 +522,22 @@ private Publisher<? extends Result> insert(Collection<Span> spans, Connection co
.bind("trace_id" + i, span.traceId())
.bind("name" + i, span.name())
.bind("type" + i, span.type().toString())
.bind("start_time" + i, span.startTime().toString());
.bind("start_time" + i, span.startTime().toString())
.bind("parent_span_id" + i, span.parentSpanId() != null ? span.parentSpanId() : "")
.bind("input" + i, span.input() != null ? span.input().toString() : "")
.bind("output" + i, span.output() != null ? span.output().toString() : "")
.bind("metadata" + i, span.metadata() != null ? span.metadata().toString() : "")
.bind("tags" + i, span.tags() != null ? span.tags().toArray(String[]::new) : new String[]{})
.bind("created_at" + i, span.createdAt().toString())
.bind("created_by" + i, userName)
.bind("last_updated_by" + i, userName);

if (span.parentSpanId() != null) {
statement.bind("parent_span_id" + i, span.parentSpanId());
} else {
statement.bind("parent_span_id" + i, "");
}
if (span.endTime() != null) {
statement.bind("end_time" + i, span.endTime().toString());
} else {
statement.bindNull("end_time" + i, String.class);
}

if (span.input() != null) {
statement.bind("input" + i, span.input().toString());
} else {
statement.bind("input" + i, "");
}
if (span.output() != null) {
statement.bind("output" + i, span.output().toString());
} else {
statement.bind("output" + i, "");
}
if (span.metadata() != null) {
statement.bind("metadata" + i, span.metadata().toString());
} else {
statement.bind("metadata" + i, "");
}
if (span.tags() != null) {
statement.bind("tags" + i, span.tags().toArray(String[]::new));
} else {
statement.bind("tags" + i, new String[]{});
}

if (span.usage() != null) {
Stream.Builder<String> keys = Stream.builder();
Stream.Builder<Integer> values = Stream.builder();
Expand All @@ -574,9 +554,6 @@ private Publisher<? extends Result> insert(Collection<Span> spans, Connection co
statement.bind("usage_values" + i, new Integer[]{});
}

statement.bind("created_at" + i, span.createdAt().toString());
statement.bind("created_by" + i, userName);
statement.bind("last_updated_by" + i, userName);
i++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.clickhouse.client.ClickHouseException;
import com.comet.opik.api.Project;
import com.comet.opik.api.Span;
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.SpanBulk;
import com.comet.opik.api.SpanSearchCriteria;
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.error.EntityAlreadyExistsException;
Expand Down Expand Up @@ -252,7 +252,7 @@ public Mono<Boolean> validateSpanWorkspace(@NonNull String workspaceId, @NonNull
}

@Trace(dispatcher = true)
public Mono<Void> create(SpanBatch batch) {
public Mono<Void> create(SpanBulk batch) {

if (batch.spans().isEmpty()) {
return Mono.empty();
Expand All @@ -276,7 +276,7 @@ public Mono<Void> create(SpanBatch batch) {

return lockService.lockAll(spanIds, SPAN_KEY)
.flatMap(locks -> mergeSpans(spans, spanIds)
.flatMap(spanDAO::batchInsert)
.flatMap(spanDAO::bulkInsert)
.doFinally(signalType -> lockService.unlockAll(locks).subscribe())
.subscribeOn(Schedulers.boundedElastic()))
.then();
Expand Down Expand Up @@ -326,21 +326,7 @@ private Span mergeSpans(Span currentSpan, Span receivedSpan) {
throw new EntityAlreadyExistsException(new ErrorMessage(List.of(PARENT_SPAN_IS_MISMATCH)));
}

return Span.builder()
.id(currentSpan.id())
.projectId(currentSpan.projectId() != null ? currentSpan.projectId() : receivedSpan.projectId())
.traceId(currentSpan.traceId() != null ? currentSpan.traceId() : receivedSpan.traceId())
.parentSpanId(
currentSpan.parentSpanId() != null ? currentSpan.parentSpanId() : receivedSpan.parentSpanId())
.name(currentSpan.name() != null ? currentSpan.name() : receivedSpan.name())
.type(currentSpan.type() != null ? currentSpan.type() : receivedSpan.type())
.startTime(currentSpan.startTime() != null ? currentSpan.startTime() : receivedSpan.startTime())
.endTime(receivedSpan.endTime() != null ? receivedSpan.endTime() : currentSpan.endTime())
.input(receivedSpan.input() != null ? receivedSpan.input() : currentSpan.input())
.output(receivedSpan.output() != null ? receivedSpan.output() : currentSpan.output())
.metadata(receivedSpan.metadata() != null ? receivedSpan.metadata() : currentSpan.metadata())
.tags(receivedSpan.tags() != null ? receivedSpan.tags() : currentSpan.tags())
.usage(receivedSpan.usage() != null ? receivedSpan.usage() : currentSpan.usage())
return receivedSpan.toBuilder()
.createdAt(getInstant(currentSpan.createdAt(), receivedSpan.createdAt()))
.build();
}
Expand All @@ -355,7 +341,7 @@ private Instant getInstant(Instant current, Instant received) {
}
}

private List<Span> bindSpanToProjectAndId(SpanBatch batch, List<Project> projects) {
private List<Span> bindSpanToProjectAndId(SpanBulk batch, List<Project> projects) {
Map<String, Project> projectPerName = projects.stream()
.collect(Collectors.toMap(Project::name, Function.identity()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String writeValueAsString(@NonNull Object value) {
}
}

public void writeValueAsString(ByteArrayOutputStream baos, @NonNull Object value) {
public void writeValueAsString(@NonNull ByteArrayOutputStream baos, @NonNull Object value) {
try {
MAPPER.writeValue(baos, value);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.comet.opik.api.Project;
import com.comet.opik.api.ScoreSource;
import com.comet.opik.api.Span;
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.SpanBulk;
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.filter.Field;
Expand Down Expand Up @@ -3254,11 +3254,11 @@ void batch__whenCreateSpansAndThenUpdateInSameRequest__thenReturnUpdated() {
void batch__whenCreateSpansWithConflicts__thenReturn409(List<Span> expectedSpans, String expectedError) {

try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI))
.path("batch")
.path("bulk")
.request()
.header(HttpHeaders.AUTHORIZATION, API_KEY)
.header(WORKSPACE_HEADER, TEST_WORKSPACE)
.post(Entity.json(new SpanBatch(expectedSpans)))) {
.post(Entity.json(new SpanBulk(expectedSpans)))) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(409);
assertThat(actualResponse.hasEntity()).isTrue();
Expand Down Expand Up @@ -3302,17 +3302,16 @@ public Stream<Arguments> batch__whenCreateSpansWithConflicts__thenReturn409() {
)
);
}

}

private void batchCreateAndAssert(List<Span> expectedSpans, String apiKey, String workspaceName) {

try (var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI))
.path("batch")
.path("bulk")
.request()
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.post(Entity.json(new SpanBatch(expectedSpans)))) {
.post(Entity.json(new SpanBulk(expectedSpans)))) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204);
assertThat(actualResponse.hasEntity()).isFalse();
Expand Down

0 comments on commit e03b560

Please sign in to comment.