Skip to content

Commit

Permalink
[OPIK-138] Add last updated trace at field (#342)
Browse files Browse the repository at this point in the history
* [OPIK-138] Add last updated trace at field

* Fix tests
  • Loading branch information
thiagohora authored Oct 7, 2024
1 parent b4b57d1 commit e95988d
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import lombok.Builder;
Expand All @@ -29,7 +30,9 @@ public record Project(
@JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant createdAt,
@JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String createdBy,
@JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant lastUpdatedAt,
@JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String lastUpdatedBy){
@JsonView({Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String lastUpdatedBy,
@JsonView({
Project.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) @Nullable Instant lastUpdatedTraceAt){

public static class View {
public static class Write {
Expand All @@ -47,5 +50,9 @@ public record ProjectPage(
@JsonView({Project.View.Public.class}) List<Project> content)
implements
com.comet.opik.api.Page<Project>{

public static ProjectPage empty(int page) {
return new ProjectPage(page, 0, 0, List.of());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.comet.opik.api.ExperimentItem;
import com.comet.opik.api.FeedbackScore;
import com.comet.opik.api.ScoreSource;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.comet.opik.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.inject.ImplementedBy;
Expand Down Expand Up @@ -337,7 +337,7 @@ LEFT JOIN (
;
""";

private final @NonNull TransactionTemplate asyncTemplate;
private final @NonNull TransactionTemplateAsync asyncTemplate;

@Override
@Trace(dispatcher = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

import static com.comet.opik.api.Dataset.DatasetPage;
import static com.comet.opik.domain.ExperimentItemDAO.ExperimentSummary;
import static com.comet.opik.infrastructure.db.TransactionTemplate.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplate.WRITE;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;
import static java.util.stream.Collectors.toMap;

@ImplementedBy(DatasetServiceImpl.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import java.util.UUID;

import static com.comet.opik.infrastructure.db.TransactionTemplate.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplate.WRITE;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;

@ImplementedBy(FeedbackDefinitionServiceImpl.class)
public interface FeedbackDefinitionService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.error.IdentifierMismatchException;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.utils.WorkspaceUtils;
import com.google.inject.ImplementedBy;
Expand All @@ -30,8 +30,8 @@
import java.util.function.Function;

import static com.comet.opik.domain.FeedbackScoreDAO.EntityType;
import static com.comet.opik.infrastructure.db.TransactionTemplate.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplate.WRITE;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;

Expand All @@ -58,7 +58,7 @@ class FeedbackScoreServiceImpl implements FeedbackScoreService {

private final @NonNull FeedbackScoreDAO dao;
private final @NonNull ru.vyarus.guicey.jdbi3.tx.TransactionTemplate syncTemplate;
private final @NonNull TransactionTemplate asyncTemplate;
private final @NonNull TransactionTemplateAsync asyncTemplate;
private final @NonNull IdGenerator idGenerator;
private final @NonNull LockService lockService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import com.comet.opik.api.Page;
import com.comet.opik.api.Project;
import com.comet.opik.api.Project.ProjectPage;
import com.comet.opik.api.ProjectCriteria;
import com.comet.opik.api.ProjectUpdate;
import com.comet.opik.api.error.CannotDeleteProjectException;
import com.comet.opik.api.error.EntityAlreadyExistsException;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.google.inject.ImplementedBy;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
Expand All @@ -21,12 +23,16 @@
import ru.vyarus.guicey.jdbi3.tx.TransactionTemplate;

import java.sql.SQLIntegrityConstraintViolationException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import static com.comet.opik.infrastructure.db.TransactionTemplate.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplate.WRITE;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;
import static java.util.stream.Collectors.toSet;

@ImplementedBy(ProjectServiceImpl.class)
public interface ProjectService {
Expand Down Expand Up @@ -58,10 +64,15 @@ public interface ProjectService {
@RequiredArgsConstructor(onConstructor_ = @Inject)
class ProjectServiceImpl implements ProjectService {

record ProjectRecordSet(List<Project> content, long total) {
}

private static final String PROJECT_ALREADY_EXISTS = "Project already exists";
private final @NonNull TransactionTemplate template;
private final @NonNull IdGenerator idGenerator;
private final @NonNull Provider<RequestContext> requestContext;
private final @NonNull TraceDAO traceDAO;
private final @NonNull TransactionTemplateAsync transactionTemplateAsync;

private NotFoundException createNotFoundError() {
String message = "Project not found";
Expand Down Expand Up @@ -89,14 +100,16 @@ private Project createProject(Project project, UUID projectId, String userName,
.build();

try {
return template.inTransaction(WRITE, handle -> {
template.inTransaction(WRITE, handle -> {

var repository = handle.attach(ProjectDAO.class);

repository.save(workspaceId, newProject);

return repository.findById(projectId, workspaceId);
return newProject;
});

return get(newProject.id(), workspaceId);
} catch (UnableToExecuteStatementException e) {
if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
throw newConflict();
Expand All @@ -117,7 +130,7 @@ public Project update(@NonNull UUID id, @NonNull ProjectUpdate projectUpdate) {
String workspaceId = requestContext.get().getWorkspaceId();

try {
return template.inTransaction(WRITE, handle -> {
template.inTransaction(WRITE, handle -> {

var repository = handle.attach(ProjectDAO.class);

Expand All @@ -130,9 +143,10 @@ public Project update(@NonNull UUID id, @NonNull ProjectUpdate projectUpdate) {
projectUpdate.description(),
userName);

return repository.findById(id, workspaceId);
return null;
});

return get(id, workspaceId);
} catch (UnableToExecuteStatementException e) {
if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
throw newConflict();
Expand All @@ -151,12 +165,20 @@ public Project get(@NonNull UUID id) {

@Override
public Project get(@NonNull UUID id, @NonNull String workspaceId) {
return template.inTransaction(READ_ONLY, handle -> {
Project project = template.inTransaction(READ_ONLY, handle -> {

var repository = handle.attach(ProjectDAO.class);

return repository.fetch(id, workspaceId).orElseThrow(this::createNotFoundError);
});

Map<UUID, Instant> lastUpdatedTraceAt = transactionTemplateAsync
.nonTransaction(connection -> traceDAO.getLastUpdatedTraceAt(Set.of(id), workspaceId, connection))
.block();

return project.toBuilder()
.lastUpdatedTraceAt(lastUpdatedTraceAt.get(id))
.build();
}

@Override
Expand Down Expand Up @@ -188,19 +210,37 @@ public void delete(@NonNull UUID id) {

@Override
public Page<Project> find(int page, int size, @NonNull ProjectCriteria criteria) {

String workspaceId = requestContext.get().getWorkspaceId();

return template.inTransaction(READ_ONLY, handle -> {
ProjectRecordSet projectRecordSet = template.inTransaction(READ_ONLY, handle -> {

ProjectDAO repository = handle.attach(ProjectDAO.class);

int offset = (page - 1) * size;

List<Project> projects = repository.find(size, offset, workspaceId, criteria.projectName());

return new Project.ProjectPage(page, projects.size(),
repository.findCount(workspaceId, criteria.projectName()), projects);
return new ProjectRecordSet(
repository.find(size, offset, workspaceId, criteria.projectName()),
repository.findCount(workspaceId, criteria.projectName()));
});

if (projectRecordSet.content().isEmpty()) {
return ProjectPage.empty(page);
}

Map<UUID, Instant> projectLastUpdatedTraceAtMap = transactionTemplateAsync.nonTransaction(connection -> {
Set<UUID> projectIds = projectRecordSet.content().stream().map(Project::id).collect(toSet());
return traceDAO.getLastUpdatedTraceAt(projectIds, workspaceId, connection);
}).block();

List<Project> projects = projectRecordSet.content()
.stream()
.map(project -> project.toBuilder()
.lastUpdatedTraceAt(projectLastUpdatedTraceAtMap.get(project.id()))
.build())
.toList();

return new ProjectPage(page, projects.size(), projectRecordSet.total(), projects);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -68,6 +69,9 @@ interface TraceDAO {
Mono<Long> batchInsert(List<Trace> traces, Connection connection);

Flux<WorkspaceTraceCount> countTracesPerWorkspace(Connection connection);

Mono<Map<UUID, Instant>> getLastUpdatedTraceAt(@NonNull Set<UUID> projectIds, @NonNull String workspaceId,
@NonNull Connection connection);
}

@Slf4j
Expand Down Expand Up @@ -494,6 +498,17 @@ LEFT JOIN (
;
""";

private static final String SELECT_TRACE_LAST_UPDATED_AT = """
SELECT
t.project_id as project_id,
MAX(t.last_updated_at) as last_updated_at
FROM traces t
WHERE t.workspace_id = :workspace_id
AND t.project_id IN :project_ids
GROUP BY t.project_id
;
""";

private final @NonNull FeedbackScoreDAO feedbackScoreDAO;
private final @NonNull FilterQueryBuilder filterQueryBuilder;

Expand Down Expand Up @@ -886,4 +901,25 @@ public Flux<WorkspaceTraceCount> countTracesPerWorkspace(Connection connection)
.workspace(row.get("workspace_id", String.class))
.traceCount(row.get("trace_count", Integer.class)).build()));
}

@Override
public Mono<Map<UUID, Instant>> getLastUpdatedTraceAt(
@NonNull Set<UUID> projectIds, @NonNull String workspaceId, @NonNull Connection connection) {

log.info("Getting last updated trace at for projectIds {}", Arrays.toString(projectIds.toArray()));

var statement = connection.createStatement(SELECT_TRACE_LAST_UPDATED_AT)
.bind("project_ids", projectIds.toArray(UUID[]::new))
.bind("workspace_id", workspaceId);

return Mono.from(statement.execute())
.flatMapMany(result -> result.map((row, rowMetadata) -> Map.entry(row.get("project_id", UUID.class),
row.get("last_updated_at", Instant.class))))
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
log.info("Got last updated trace at for projectIds {}", Arrays.toString(projectIds.toArray()));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.error.IdentifierMismatchException;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.utils.AsyncUtils;
import com.comet.opik.utils.WorkspaceUtils;
Expand Down Expand Up @@ -73,7 +73,7 @@ class TraceServiceImpl implements TraceService {
private final @NonNull TraceDAO dao;
private final @NonNull SpanDAO spanDAO;
private final @NonNull FeedbackScoreDAO feedbackScoreDAO;
private final @NonNull TransactionTemplate template;
private final @NonNull TransactionTemplateAsync template;
private final @NonNull ProjectService projectService;
private final @NonNull IdGenerator idGenerator;
private final @NonNull LockService lockService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Optional;

import static com.comet.opik.infrastructure.db.TransactionTemplate.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplate.WRITE;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;

@ImplementedBy(UsageReportServiceImpl.class)
interface UsageReportService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@RequiredArgsConstructor(onConstructor_ = @Inject)
public class ClickHouseHealthyCheck extends NamedHealthCheck {

private final @NonNull TransactionTemplate template;
private final @NonNull TransactionTemplateAsync template;

@Override
public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public String getDatabaseName() {

@Provides
@Singleton
public TransactionTemplate getTransactionTemplate(ConnectionFactory connectionFactory) {
return new TransactionTemplateImpl(connectionFactory);
public TransactionTemplateAsync getTransactionTemplate(ConnectionFactory connectionFactory) {
return new TransactionTemplateAsyncImpl(connectionFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import reactor.core.publisher.Mono;
import ru.vyarus.guicey.jdbi3.tx.TxConfig;

public interface TransactionTemplate {
public interface TransactionTemplateAsync {

TxConfig WRITE = new TxConfig().readOnly(false);
TxConfig READ_ONLY = new TxConfig().readOnly(true);
Expand All @@ -26,7 +26,7 @@ interface NoTransactionStream<T> {
}

@RequiredArgsConstructor
class TransactionTemplateImpl implements TransactionTemplate {
class TransactionTemplateAsyncImpl implements TransactionTemplateAsync {

private final ConnectionFactory connectionFactory;

Expand Down
Loading

0 comments on commit e95988d

Please sign in to comment.