Skip to content

Commit

Permalink
OPIK-42 Compare only dataset items with matching experiment items
Browse files Browse the repository at this point in the history
  • Loading branch information
andrescrz committed Sep 3, 2024
1 parent 68f096c commit 78cf242
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.comet.opik.utils.AsyncUtils;
import com.comet.opik.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
Expand Down Expand Up @@ -52,9 +51,6 @@ public interface DatasetItemDAO {

Mono<DatasetItemPage> getItems(DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size);

Mono<DatasetItemPage> getItemsFromSingleExperiment(
DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size);

Mono<DatasetItem> get(UUID id);

Flux<DatasetItem> getItems(UUID datasetId, int limit, UUID lastRetrievedId);
Expand Down Expand Up @@ -206,7 +202,7 @@ LEFT JOIN (
/**
* Counts dataset items only if there's a matching experiment item.
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT_COUNT = """
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_COUNT = """
SELECT
COUNT(DISTINCT di.id) AS count
FROM (
Expand All @@ -222,7 +218,7 @@ INNER JOIN (
SELECT
dataset_item_id
FROM experiment_items
WHERE experiment_id = :experimentId
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
Expand Down Expand Up @@ -272,114 +268,6 @@ INNER JOIN (
* }
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS = """
SELECT
di.id AS id,
di.input AS input,
di.expected_output AS expected_output,
di.metadata AS metadata,
di.trace_id AS trace_id,
di.span_id AS span_id,
di.source AS source,
di.created_at AS created_at,
di.last_updated_at AS last_updated_at,
di.created_by AS created_by,
di.last_updated_by AS last_updated_by,
groupArray(tuple(
ei.id,
ei.experiment_id,
ei.dataset_item_id,
ei.trace_id,
tfs.input,
tfs.output,
tfs.feedback_scores_array,
ei.created_at,
ei.last_updated_at,
ei.created_by,
ei.last_updated_by
)) AS experiment_items_array
FROM (
SELECT
*
FROM dataset_items
WHERE dataset_id = :datasetId
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS di
LEFT JOIN (
SELECT
*
FROM experiment_items
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS ei ON di.id = ei.dataset_item_id
LEFT JOIN (
SELECT
t.id,
t.input,
t.output,
groupArray(tuple(
fs.entity_id,
fs.name,
fs.category_name,
fs.value,
fs.reason,
fs.source
)) AS feedback_scores_array
FROM (
SELECT
id,
input,
output
FROM traces
WHERE workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS t
LEFT JOIN (
SELECT
entity_id,
name,
category_name,
value,
reason,
source
FROM feedback_scores
WHERE entity_type = :entityType
AND workspace_id = :workspace_id
ORDER BY entity_id DESC, last_updated_at DESC
LIMIT 1 BY entity_id, name
) AS fs ON t.id = fs.entity_id
GROUP BY
t.id,
t.input,
t.output
) AS tfs ON ei.trace_id = tfs.id
GROUP BY
di.id,
di.input,
di.expected_output,
di.metadata,
di.trace_id,
di.span_id,
di.source,
di.created_at,
di.last_updated_at,
di.created_by,
di.last_updated_by
ORDER BY di.id DESC, di.last_updated_at DESC
LIMIT :limit OFFSET :offset
;
""";

/**
* Same relationships as the query above, but with two logical changes:
* - Only accepts a single experiment id.
* - Only returns dataset items if there are matching experiment items for the given experiment id.
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT = """
SELECT
di.id AS id,
di.input AS input,
Expand Down Expand Up @@ -418,7 +306,7 @@ INNER JOIN (
SELECT
*
FROM experiment_items
WHERE experiment_id = :experimentId
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
Expand Down Expand Up @@ -723,66 +611,27 @@ public Mono<DatasetItemPage> getItems(@NonNull UUID datasetId, int page, int siz
}

@Override
public Mono<DatasetItemPage> getItems(@NonNull DatasetItemSearchCriteria datasetItemSearchCriteria, int page,
int size) {
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return makeMonoContextAware(
(userName, workspaceName,
workspaceId) -> asyncTemplate
.nonTransaction(connection -> Flux
.from(connection.createStatement(SELECT_DATASET_ITEMS_COUNT)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("workspace_id", workspaceId)
.execute())
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class)))
.reduce(0L, Long::sum)
.flatMap(
count -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS)
.bind("datasetId",
datasetItemSearchCriteria.datasetId())
.bind("experimentIds",
datasetItemSearchCriteria.experimentIds())
.bind("entityType",
datasetItemSearchCriteria.entityType()
.getType())
.bind("workspace_id", workspaceId)
.bind("limit", size)
.bind("offset", (page - 1) * size)
.execute())
.flatMap(this::mapItem)
.collectList()
.flatMap(items -> Mono.just(new DatasetItemPage(items, page,
items.size(), count))))));
}

@Override
public Mono<DatasetItemPage> getItemsFromSingleExperiment(
public Mono<DatasetItemPage> getItems(
@NonNull DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size) {
Preconditions.checkArgument(datasetItemSearchCriteria.experimentIds().size() == 1);
log.info("Finding dataset items with experiment items from single experiment by '{}', page '{}', size '{}'",
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
var experimentId = datasetItemSearchCriteria.experimentIds().stream().toList().getFirst();
return makeMonoContextAware((userName, workspaceName, workspaceId) -> asyncTemplate.nonTransaction(
connection -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT_COUNT)
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_COUNT)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("experimentId", experimentId)
.bind("experimentIds", datasetItemSearchCriteria.experimentIds())
.bind("workspace_id", workspaceId)
.execute())
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class)))
.reduce(0L, Long::sum)
.flatMap(count -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT)
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("experimentId", experimentId)
.bind("experimentIds", datasetItemSearchCriteria.experimentIds())
.bind("entityType", datasetItemSearchCriteria.entityType().getType())
.bind("workspace_id", workspaceId)
.bind("limit", size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,8 @@ public Mono<DatasetItemPage> getItems(@NonNull UUID datasetId, int page, int siz
@Override
public Mono<DatasetItemPage> getItems(
int page, int size, @NonNull DatasetItemSearchCriteria datasetItemSearchCriteria) {
if (datasetItemSearchCriteria.experimentIds().size() == 1) {
log.info("Finding dataset items with experiment items from single experiment by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItemsFromSingleExperiment(datasetItemSearchCriteria, page, size);
} else {
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItems(datasetItemSearchCriteria, page, size);
}
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItems(datasetItemSearchCriteria, page, size);
}
}
Loading

0 comments on commit 78cf242

Please sign in to comment.