Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPIK-42 Compare only dataset items with matching experiment items #168

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.comet.opik.utils.AsyncUtils;
import com.comet.opik.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
Expand Down Expand Up @@ -52,9 +51,6 @@ public interface DatasetItemDAO {

Mono<DatasetItemPage> getItems(DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size);

Mono<DatasetItemPage> getItemsFromSingleExperiment(
DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size);

Mono<DatasetItem> get(UUID id);

Flux<DatasetItem> getItems(UUID datasetId, int limit, UUID lastRetrievedId);
Expand Down Expand Up @@ -206,7 +202,7 @@ LEFT JOIN (
/**
* Counts dataset items only if there's a matching experiment item.
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT_COUNT = """
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_COUNT = """
SELECT
COUNT(DISTINCT di.id) AS count
FROM (
Expand All @@ -222,7 +218,7 @@ INNER JOIN (
SELECT
dataset_item_id
FROM experiment_items
WHERE experiment_id = :experimentId
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
Expand Down Expand Up @@ -272,114 +268,6 @@ INNER JOIN (
* }
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS = """
SELECT
di.id AS id,
di.input AS input,
di.expected_output AS expected_output,
di.metadata AS metadata,
di.trace_id AS trace_id,
di.span_id AS span_id,
di.source AS source,
di.created_at AS created_at,
di.last_updated_at AS last_updated_at,
di.created_by AS created_by,
di.last_updated_by AS last_updated_by,
groupArray(tuple(
ei.id,
ei.experiment_id,
ei.dataset_item_id,
ei.trace_id,
tfs.input,
tfs.output,
tfs.feedback_scores_array,
ei.created_at,
ei.last_updated_at,
ei.created_by,
ei.last_updated_by
)) AS experiment_items_array
FROM (
SELECT
*
FROM dataset_items
WHERE dataset_id = :datasetId
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS di
LEFT JOIN (
SELECT
*
FROM experiment_items
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS ei ON di.id = ei.dataset_item_id
LEFT JOIN (
SELECT
t.id,
t.input,
t.output,
groupArray(tuple(
fs.entity_id,
fs.name,
fs.category_name,
fs.value,
fs.reason,
fs.source
)) AS feedback_scores_array
FROM (
SELECT
id,
input,
output
FROM traces
WHERE workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
) AS t
LEFT JOIN (
SELECT
entity_id,
name,
category_name,
value,
reason,
source
FROM feedback_scores
WHERE entity_type = :entityType
AND workspace_id = :workspace_id
ORDER BY entity_id DESC, last_updated_at DESC
LIMIT 1 BY entity_id, name
) AS fs ON t.id = fs.entity_id
GROUP BY
t.id,
t.input,
t.output
) AS tfs ON ei.trace_id = tfs.id
GROUP BY
di.id,
di.input,
di.expected_output,
di.metadata,
di.trace_id,
di.span_id,
di.source,
di.created_at,
di.last_updated_at,
di.created_by,
di.last_updated_by
ORDER BY di.id DESC, di.last_updated_at DESC
LIMIT :limit OFFSET :offset
;
""";

/**
* Same relationships as the query above, but with two logical changes:
* - Only accepts a single experiment id.
* - Only returns dataset items if there are matching experiment items for the given experiment id.
*/
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT = """
SELECT
di.id AS id,
di.input AS input,
Expand Down Expand Up @@ -418,7 +306,7 @@ INNER JOIN (
SELECT
*
FROM experiment_items
WHERE experiment_id = :experimentId
WHERE experiment_id in :experimentIds
AND workspace_id = :workspace_id
ORDER BY id DESC, last_updated_at DESC
LIMIT 1 BY id
Expand Down Expand Up @@ -723,66 +611,27 @@ public Mono<DatasetItemPage> getItems(@NonNull UUID datasetId, int page, int siz
}

@Override
public Mono<DatasetItemPage> getItems(@NonNull DatasetItemSearchCriteria datasetItemSearchCriteria, int page,
int size) {
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return makeMonoContextAware(
(userName, workspaceName,
workspaceId) -> asyncTemplate
.nonTransaction(connection -> Flux
.from(connection.createStatement(SELECT_DATASET_ITEMS_COUNT)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("workspace_id", workspaceId)
.execute())
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class)))
.reduce(0L, Long::sum)
.flatMap(
count -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS)
.bind("datasetId",
datasetItemSearchCriteria.datasetId())
.bind("experimentIds",
datasetItemSearchCriteria.experimentIds())
.bind("entityType",
datasetItemSearchCriteria.entityType()
.getType())
.bind("workspace_id", workspaceId)
.bind("limit", size)
.bind("offset", (page - 1) * size)
.execute())
.flatMap(this::mapItem)
.collectList()
.flatMap(items -> Mono.just(new DatasetItemPage(items, page,
items.size(), count))))));
}

@Override
public Mono<DatasetItemPage> getItemsFromSingleExperiment(
public Mono<DatasetItemPage> getItems(
@NonNull DatasetItemSearchCriteria datasetItemSearchCriteria, int page, int size) {
Preconditions.checkArgument(datasetItemSearchCriteria.experimentIds().size() == 1);
log.info("Finding dataset items with experiment items from single experiment by '{}', page '{}', size '{}'",
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
var experimentId = datasetItemSearchCriteria.experimentIds().stream().toList().getFirst();
return makeMonoContextAware((userName, workspaceName, workspaceId) -> asyncTemplate.nonTransaction(
connection -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT_COUNT)
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_COUNT)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("experimentId", experimentId)
.bind("experimentIds", datasetItemSearchCriteria.experimentIds())
.bind("workspace_id", workspaceId)
.execute())
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, Long.class)))
.reduce(0L, Long::sum)
.flatMap(count -> Flux
.from(connection
.createStatement(
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_FROM_SINGLE_EXPERIMENT)
SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS)
.bind("datasetId", datasetItemSearchCriteria.datasetId())
.bind("experimentId", experimentId)
.bind("experimentIds", datasetItemSearchCriteria.experimentIds())
.bind("entityType", datasetItemSearchCriteria.entityType().getType())
.bind("workspace_id", workspaceId)
.bind("limit", size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,8 @@ public Mono<DatasetItemPage> getItems(@NonNull UUID datasetId, int page, int siz
@Override
public Mono<DatasetItemPage> getItems(
int page, int size, @NonNull DatasetItemSearchCriteria datasetItemSearchCriteria) {
if (datasetItemSearchCriteria.experimentIds().size() == 1) {
log.info("Finding dataset items with experiment items from single experiment by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItemsFromSingleExperiment(datasetItemSearchCriteria, page, size);
} else {
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItems(datasetItemSearchCriteria, page, size);
}
log.info("Finding dataset items with experiment items by '{}', page '{}', size '{}'",
datasetItemSearchCriteria, page, size);
return dao.getItems(datasetItemSearchCriteria, page, size);
}
}
Loading