Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPIK-48] e2e tests #192

Merged
merged 10 commits into from
Sep 6, 2024
4 changes: 3 additions & 1 deletion sdks/python/examples/manual_chain_building.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import opik
import os

os.environ["OPIK_URL_OVERRIDE"] = "http://localhost:5173/api"
client = opik.Opik()

trace = client.trace(name="trace-name")
trace = client.trace()
span1 = trace.span(name="span-1")
span2 = span1.span(name="span-2")
span2.end()
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/src/opik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from . import _logging
from . import package_version
from .plugins.pytest.decorator import llm_unit
from .evaluation import evaluate

_logging.setup()

__version__ = package_version.VERSION
__all__ = [
"__version__",
"evaluate",
"track",
"flush_tracker",
"Opik",
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/src/opik/api_objects/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def __init__(
dataset_name: str,
rest_client: rest_api_client.OpikApi,
) -> None:
self._id = id
self._name = name
self._dataset_name = dataset_name
self.id = id
self.name = name
self.dataset_name = dataset_name
self._rest_client = rest_client

def insert(self, experiment_items: List[experiment_item.ExperimentItem]) -> None:
rest_experiment_items = [
rest_experiment_item.ExperimentItem(
id=item.id if item.id is not None else helpers.generate_id(),
experiment_id=self._id,
experiment_id=self.id,
dataset_item_id=item.dataset_item_id,
trace_id=item.trace_id,
)
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/src/opik/api_objects/opik_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from ..message_processing import streamer_constructors, messages
from ..rest_api import client as rest_api_client
from ..rest_api.types import dataset_public
from ..rest_api.types import dataset_public, trace_public, span_public
from .. import datetime_helpers, config, httpx_client


Expand Down Expand Up @@ -377,6 +377,12 @@ def flush(self, timeout: Optional[int] = None) -> None:
timeout = timeout if timeout is not None else self._flush_timeout
self._streamer.flush(timeout)

def get_trace_content(self, id: str) -> trace_public.TracePublic:
return self._rest_client.traces.get_trace_by_id(id)

def get_span_content(self, id: str) -> span_public.SpanPublic:
return self._rest_client.spans.get_span_by_id(id)


@functools.lru_cache()
def get_client_cached() -> Opik:
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/src/opik/evaluation/evaluation_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import List

import dataclasses

from . import test_result


@dataclasses.dataclass
class EvaluationResult:
experiment_id: str
experiment_name: str
test_results: List[test_result.TestResult]
18 changes: 9 additions & 9 deletions sdks/python/src/opik/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..api_objects.experiment import experiment_item
from ..api_objects import opik_client

from . import tasks_scorer, test_result, scores_logger, report
from . import tasks_scorer, scores_logger, report, evaluation_result


def evaluate(
Expand All @@ -17,8 +17,7 @@ def evaluate(
experiment_name: str,
verbose: int = 1,
task_threads: int = 16,
scoring_threads: int = 16,
) -> List[test_result.TestResult]:
) -> evaluation_result.EvaluationResult:
"""
Performs task evaluation on a given dataset.

Expand All @@ -38,11 +37,6 @@ def evaluate(
threads are created, all tasks executed in the current thread sequentially.
are executed sequentially in the current thread.
Use more than 1 worker if your task object is compatible with sharing across threads.

scoring_threads: amount of thread workers to compute metric scores. If set to 1,
no additional threads are created, all metrics are computed in the
current thread sequentially.
Use more than 1 worker if your metrics are compatible with sharing across threads.
"""
client = opik_client.get_client_cached()
start_time = time.time()
Expand Down Expand Up @@ -77,4 +71,10 @@ def evaluate(
experiment.insert(experiment_items=experiment_items)

client.flush()
return test_results

evaluation_result_ = evaluation_result.EvaluationResult(
experiment_id=experiment.id,
experiment_name=experiment.name,
test_results=test_results,
)
return evaluation_result_
49 changes: 47 additions & 2 deletions sdks/python/src/opik/synchronization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import time
from typing import Callable, Optional
import logging
from typing import Callable, Optional, Any


LOGGER = logging.getLogger(__name__)


def wait_for_done(
check_function: Callable,
check_function: Callable[[], bool],
timeout: Optional[float],
progress_callback: Optional[Callable] = None,
sleep_time: float = 1,
Expand All @@ -19,3 +23,44 @@ def wait_for_done(
end_sleep_time = time.time() + sleep_time
while check_function() is False and time.time() < end_sleep_time:
time.sleep(sleep_time / 20.0)


def until(
function: Callable[[], bool],
sleep: float = 0.5,
max_try_seconds: float = 10,
allow_errors: bool = False,
) -> bool:
start_time = time.time()
while True:
try:
if function():
break
except Exception:
LOGGER.debug(
f"{function.__name__} raised error in 'until' function.", exc_info=True
)
if not allow_errors:
raise
finally:
if (time.time() - start_time) > max_try_seconds:
return False
time.sleep(sleep)
return True


def try_get_until(
function: Callable[[], Any], sleep: float = 0.5, max_try_seconds: float = 10
) -> Any:
"""
As soon
"""
start_time = time.time()
while True:
try:
return function()
except Exception:
if (time.time() - start_time) > max_try_seconds:
raise
time.sleep(sleep)
return True
8 changes: 5 additions & 3 deletions sdks/python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from opik import context_storage
from opik.api_objects import opik_client
from .testlib import fake_message_processor
from .testlib import backend_emulator_message_processor
from opik.message_processing import streamer_constructors


Expand All @@ -12,7 +12,7 @@ def clear_context_storage():


@pytest.fixture(autouse=True)
def shutdown_cached_client():
def shutdown_cached_client_after_test():
yield
if opik_client.get_client_cached.cache_info().currsize > 0:
opik_client.get_client_cached().end()
Expand All @@ -22,7 +22,9 @@ def shutdown_cached_client():
@pytest.fixture
def fake_streamer():
try:
fake_message_processor_ = fake_message_processor.FakeMessageProcessor()
fake_message_processor_ = (
backend_emulator_message_processor.BackendEmulatorMessageProcessor()
)
streamer = streamer_constructors.construct_streamer(
message_processor=fake_message_processor_,
n_consumers=1,
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions sdks/python/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import random
import string

import opik
import opik.api_objects.opik_client

import pytest


def _random_chars(n: int = 6) -> str:
return "".join(random.choice(string.ascii_letters) for _ in range(n))


@pytest.fixture(scope="session")
def configure_e2e_tests_env():
os.environ["OPIK_PROJECT_NAME"] = "e2e-tests"
# os.environ["OPIK_URL_OVERRIDE"] = "http://localhost:5173/api"


@pytest.fixture()
def opik_client(configure_e2e_tests_env, shutdown_cached_client_after_test):
opik_client_ = opik.api_objects.opik_client.Opik()

yield opik_client_

opik_client_.end()


@pytest.fixture
def dataset_name(opik_client: opik.Opik):
name = f"e2e-tests-dataset-{ _random_chars()}"
yield name

opik_client.delete_dataset(name)


@pytest.fixture
def experiment_name(opik_client: opik.Opik):
name = f"e2e-tests-experiment-{ _random_chars()}"
yield name

# TODO: delete the experiment
50 changes: 50 additions & 0 deletions sdks/python/tests/e2e/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import opik
from . import verifiers
from opik.api_objects.dataset import dataset_item


def test_create_and_populate_dataset__happyflow(
opik_client: opik.Opik, dataset_name: str
):
DESCRIPTION = "E2E test dataset"

dataset = opik_client.create_dataset(dataset_name, description=DESCRIPTION)

dataset.insert(
[
{
"input": {"question": "What is the of capital of France?"},
"expected_output": {"output": "Paris"},
},
{
"input": {"question": "What is the of capital of Germany?"},
"expected_output": {"output": "Berlin"},
},
{
"input": {"question": "What is the of capital of Poland?"},
"expected_output": {"output": "Warsaw"},
},
]
)

EXPECTED_DATASET_ITEMS = [
dataset_item.DatasetItem(
input={"question": "What is the of capital of France?"},
expected_output={"output": "Paris"},
),
dataset_item.DatasetItem(
input={"question": "What is the of capital of Germany?"},
expected_output={"output": "Berlin"},
),
dataset_item.DatasetItem(
input={"question": "What is the of capital of Poland?"},
expected_output={"output": "Warsaw"},
),
]

verifiers.verify_dataset(
opik_client=opik_client,
name=dataset_name,
description=DESCRIPTION,
dataset_items=EXPECTED_DATASET_ITEMS,
)
77 changes: 77 additions & 0 deletions sdks/python/tests/e2e/test_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import opik

from opik.api_objects.dataset import dataset_item
from opik.evaluation import metrics
from . import verifiers


def test_experiment_creation_via_evaluate_function__happyflow(
opik_client: opik.Opik, dataset_name: str, experiment_name: str
):
# TODO: this test is not finished, it only checks that the script is not failing

dataset = opik_client.create_dataset(dataset_name)

dataset.insert(
[
{
"input": {"question": "What is the of capital of France?"},
"expected_output": {"output": "Paris"},
},
{
"input": {"question": "What is the of capital of Germany?"},
"expected_output": {"output": "Berlin"},
},
{
"input": {"question": "What is the of capital of Poland?"},
"expected_output": {"output": "Warsaw"},
},
]
)

def task(item: dataset_item.DatasetItem):
if item.input == {"question": "What is the of capital of France?"}:
return {"output": "Paris", "reference": item.expected_output["output"]}
if item.input == {"question": "What is the of capital of Germany?"}:
return {"output": "Berlin", "reference": item.expected_output["output"]}
if item.input == {"question": "What is the of capital of Poland?"}:
return {"output": "Krakow", "reference": item.expected_output["output"]}

raise AssertionError(
f"Task received dataset item with an unexpected input: {item.input}"
)

equals_metric = metrics.Equals()
evaluation_result = opik.evaluate(
dataset=dataset,
task=task,
scoring_metrics=[equals_metric],
experiment_name=experiment_name,
)

opik.flush_tracker()

verifiers.verify_experiment(
opik_client=opik_client,
id=evaluation_result.experiment_id,
experiment_name=evaluation_result.experiment_name,
traces_amount=3, # one trace per dataset item
feedback_scores_amount=1, # an average value of all Equals metric scores
)

# TODO: check more content of the experiment
#
# EXPECTED_DATASET_ITEMS = [
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of France?"},
# expected_output={"output": "Paris"},
# ),
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of Germany?"},
# expected_output={"output": "Berlin"},
# ),
# dataset_item.DatasetItem(
# input={"question": "What is the of capital of Poland?"},
# expected_output={"output": "Warsaw"},
# ),
# ]
Loading
Loading