Skip to content

Commit

Permalink
[OPIK-48] e2e tests (#192)
Browse files Browse the repository at this point in the history
* Add e2e test for running tracked function

* Replace low level api client usage with Opik class usage in e2e tests. Add test for dataset creation and population.

* Add missing files, add e2e test for dataset

* Add comparison for the amount of dataset items

* Refactor testlib

* Refactor e2e tests, implement new feedback tests and experiment test

* Remove debug statement from opik.__init__, add evaluate to __all__

* Update experiment e2e test, add EvaluationResult object as return value of evaluate

* Make e2e tests run agains any cpnfigured backend

* Fix import error
  • Loading branch information
alexkuzmik authored Sep 6, 2024
1 parent 7ab906b commit 81dee5e
Show file tree
Hide file tree
Showing 25 changed files with 967 additions and 209 deletions.
4 changes: 3 additions & 1 deletion sdks/python/examples/manual_chain_building.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import opik
import os

os.environ["OPIK_URL_OVERRIDE"] = "http://localhost:5173/api"
client = opik.Opik()

trace = client.trace(name="trace-name")
trace = client.trace()
span1 = trace.span(name="span-1")
span2 = span1.span(name="span-2")
span2.end()
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/src/opik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from . import _logging
from . import package_version
from .plugins.pytest.decorator import llm_unit
from .evaluation import evaluate

_logging.setup()

__version__ = package_version.VERSION
__all__ = [
"__version__",
"evaluate",
"track",
"flush_tracker",
"Opik",
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/src/opik/api_objects/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def __init__(
dataset_name: str,
rest_client: rest_api_client.OpikApi,
) -> None:
self._id = id
self._name = name
self._dataset_name = dataset_name
self.id = id
self.name = name
self.dataset_name = dataset_name
self._rest_client = rest_client

def insert(self, experiment_items: List[experiment_item.ExperimentItem]) -> None:
rest_experiment_items = [
rest_experiment_item.ExperimentItem(
id=item.id if item.id is not None else helpers.generate_id(),
experiment_id=self._id,
experiment_id=self.id,
dataset_item_id=item.dataset_item_id,
trace_id=item.trace_id,
)
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/src/opik/api_objects/opik_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from ..message_processing import streamer_constructors, messages
from ..rest_api import client as rest_api_client
from ..rest_api.types import dataset_public
from ..rest_api.types import dataset_public, trace_public, span_public
from .. import datetime_helpers, config, httpx_client


Expand Down Expand Up @@ -377,6 +377,12 @@ def flush(self, timeout: Optional[int] = None) -> None:
timeout = timeout if timeout is not None else self._flush_timeout
self._streamer.flush(timeout)

def get_trace_content(self, id: str) -> trace_public.TracePublic:
return self._rest_client.traces.get_trace_by_id(id)

def get_span_content(self, id: str) -> span_public.SpanPublic:
return self._rest_client.spans.get_span_by_id(id)


@functools.lru_cache()
def get_client_cached() -> Opik:
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/src/opik/evaluation/evaluation_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import List

import dataclasses

from . import test_result


@dataclasses.dataclass
class EvaluationResult:
experiment_id: str
experiment_name: str
test_results: List[test_result.TestResult]
18 changes: 9 additions & 9 deletions sdks/python/src/opik/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..api_objects.experiment import experiment_item
from ..api_objects import opik_client

from . import tasks_scorer, test_result, scores_logger, report
from . import tasks_scorer, scores_logger, report, evaluation_result


def evaluate(
Expand All @@ -17,8 +17,7 @@ def evaluate(
experiment_name: str,
verbose: int = 1,
task_threads: int = 16,
scoring_threads: int = 16,
) -> List[test_result.TestResult]:
) -> evaluation_result.EvaluationResult:
"""
Performs task evaluation on a given dataset.
Expand All @@ -38,11 +37,6 @@ def evaluate(
threads are created, all tasks executed in the current thread sequentially.
are executed sequentially in the current thread.
Use more than 1 worker if your task object is compatible with sharing across threads.
scoring_threads: amount of thread workers to compute metric scores. If set to 1,
no additional threads are created, all metrics are computed in the
current thread sequentially.
Use more than 1 worker if your metrics are compatible with sharing across threads.
"""
client = opik_client.get_client_cached()
start_time = time.time()
Expand Down Expand Up @@ -77,4 +71,10 @@ def evaluate(
experiment.insert(experiment_items=experiment_items)

client.flush()
return test_results

evaluation_result_ = evaluation_result.EvaluationResult(
experiment_id=experiment.id,
experiment_name=experiment.name,
test_results=test_results,
)
return evaluation_result_
49 changes: 47 additions & 2 deletions sdks/python/src/opik/synchronization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import time
from typing import Callable, Optional
import logging
from typing import Callable, Optional, Any


LOGGER = logging.getLogger(__name__)


def wait_for_done(
check_function: Callable,
check_function: Callable[[], bool],
timeout: Optional[float],
progress_callback: Optional[Callable] = None,
sleep_time: float = 1,
Expand All @@ -19,3 +23,44 @@ def wait_for_done(
end_sleep_time = time.time() + sleep_time
while check_function() is False and time.time() < end_sleep_time:
time.sleep(sleep_time / 20.0)


def until(
function: Callable[[], bool],
sleep: float = 0.5,
max_try_seconds: float = 10,
allow_errors: bool = False,
) -> bool:
start_time = time.time()
while True:
try:
if function():
break
except Exception:
LOGGER.debug(
f"{function.__name__} raised error in 'until' function.", exc_info=True
)
if not allow_errors:
raise
finally:
if (time.time() - start_time) > max_try_seconds:
return False
time.sleep(sleep)
return True


def try_get_until(
function: Callable[[], Any], sleep: float = 0.5, max_try_seconds: float = 10
) -> Any:
"""
As soon
"""
start_time = time.time()
while True:
try:
return function()
except Exception:
if (time.time() - start_time) > max_try_seconds:
raise
time.sleep(sleep)
return True
8 changes: 5 additions & 3 deletions sdks/python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from opik import context_storage
from opik.api_objects import opik_client
from .testlib import fake_message_processor
from .testlib import backend_emulator_message_processor
from opik.message_processing import streamer_constructors


Expand All @@ -12,7 +12,7 @@ def clear_context_storage():


@pytest.fixture(autouse=True)
def shutdown_cached_client():
def shutdown_cached_client_after_test():
yield
if opik_client.get_client_cached.cache_info().currsize > 0:
opik_client.get_client_cached().end()
Expand All @@ -22,7 +22,9 @@ def shutdown_cached_client():
@pytest.fixture
def fake_streamer():
try:
fake_message_processor_ = fake_message_processor.FakeMessageProcessor()
fake_message_processor_ = (
backend_emulator_message_processor.BackendEmulatorMessageProcessor()
)
streamer = streamer_constructors.construct_streamer(
message_processor=fake_message_processor_,
n_consumers=1,
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions sdks/python/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import random
import string

import opik
import opik.api_objects.opik_client

import pytest


def _random_chars(n: int = 6) -> str:
return "".join(random.choice(string.ascii_letters) for _ in range(n))


@pytest.fixture(scope="session")
def configure_e2e_tests_env():
os.environ["OPIK_PROJECT_NAME"] = "e2e-tests"
# os.environ["OPIK_URL_OVERRIDE"] = "http://localhost:5173/api"


@pytest.fixture()
def opik_client(configure_e2e_tests_env, shutdown_cached_client_after_test):
opik_client_ = opik.api_objects.opik_client.Opik()

yield opik_client_

opik_client_.end()


@pytest.fixture
def dataset_name(opik_client: opik.Opik):
name = f"e2e-tests-dataset-{ _random_chars()}"
yield name

opik_client.delete_dataset(name)


@pytest.fixture
def experiment_name(opik_client: opik.Opik):
name = f"e2e-tests-experiment-{ _random_chars()}"
yield name

# TODO: delete the experiment
50 changes: 50 additions & 0 deletions sdks/python/tests/e2e/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import opik
from . import verifiers
from opik.api_objects.dataset import dataset_item


def test_create_and_populate_dataset__happyflow(
opik_client: opik.Opik, dataset_name: str
):
DESCRIPTION = "E2E test dataset"

dataset = opik_client.create_dataset(dataset_name, description=DESCRIPTION)

dataset.insert(
[
{
"input": {"question": "What is the of capital of France?"},
"expected_output": {"output": "Paris"},
},
{
"input": {"question": "What is the of capital of Germany?"},
"expected_output": {"output": "Berlin"},
},
{
"input": {"question": "What is the of capital of Poland?"},
"expected_output": {"output": "Warsaw"},
},
]
)

EXPECTED_DATASET_ITEMS = [
dataset_item.DatasetItem(
input={"question": "What is the of capital of France?"},
expected_output={"output": "Paris"},
),
dataset_item.DatasetItem(
input={"question": "What is the of capital of Germany?"},
expected_output={"output": "Berlin"},
),
dataset_item.DatasetItem(
input={"question": "What is the of capital of Poland?"},
expected_output={"output": "Warsaw"},
),
]

verifiers.verify_dataset(
opik_client=opik_client,
name=dataset_name,
description=DESCRIPTION,
dataset_items=EXPECTED_DATASET_ITEMS,
)
77 changes: 77 additions & 0 deletions sdks/python/tests/e2e/test_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import opik

from opik.api_objects.dataset import dataset_item
from opik.evaluation import metrics
from . import verifiers


def test_experiment_creation_via_evaluate_function__happyflow(
opik_client: opik.Opik, dataset_name: str, experiment_name: str
):
# TODO: this test is not finished, it only checks that the script is not failing

dataset = opik_client.create_dataset(dataset_name)

dataset.insert(
[
{
"input": {"question": "What is the of capital of France?"},
"expected_output": {"output": "Paris"},
},
{
"input": {"question": "What is the of capital of Germany?"},
"expected_output": {"output": "Berlin"},
},
{
"input": {"question": "What is the of capital of Poland?"},
"expected_output": {"output": "Warsaw"},
},
]
)

def task(item: dataset_item.DatasetItem):
if item.input == {"question": "What is the of capital of France?"}:
return {"output": "Paris", "reference": item.expected_output["output"]}
if item.input == {"question": "What is the of capital of Germany?"}:
return {"output": "Berlin", "reference": item.expected_output["output"]}
if item.input == {"question": "What is the of capital of Poland?"}:
return {"output": "Krakow", "reference": item.expected_output["output"]}

raise AssertionError(
f"Task received dataset item with an unexpected input: {item.input}"
)

equals_metric = metrics.Equals()
evaluation_result = opik.evaluate(
dataset=dataset,
task=task,
scoring_metrics=[equals_metric],
experiment_name=experiment_name,
)

opik.flush_tracker()

verifiers.verify_experiment(
opik_client=opik_client,
id=evaluation_result.experiment_id,
experiment_name=evaluation_result.experiment_name,
traces_amount=3, # one trace per dataset item
feedback_scores_amount=1, # an average value of all Equals metric scores
)

# TODO: check more content of the experiment
#
# EXPECTED_DATASET_ITEMS = [
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of France?"},
# expected_output={"output": "Paris"},
# ),
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of Germany?"},
# expected_output={"output": "Berlin"},
# ),
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of Poland?"},
# expected_output={"output": "Warsaw"},
# ),
# ]
Loading

0 comments on commit 81dee5e

Please sign in to comment.