From 375aaf24023791480138e7806a93277b9e48554e Mon Sep 17 00:00:00 2001 From: Aliaksandr Kuzmik <98702584+alexkuzmik@users.noreply.github.com> Date: Mon, 2 Sep 2024 16:36:15 +0200 Subject: [PATCH] Dataset bugfixes, pytest integration bugfixes, evaluation improvements (#145) * Fix dataset bugs, unite tasks running and scoring, update default base url, fix bug with llm_unit not working when pytest nodeid string contains more than one space * Add missing files * Add missing init file * Always convert reason to string in hallucination metric * Fix lint errors * Stop installing sdk in editable mode during CI tests --- .github/workflows/lib-langchain-tests.yml | 2 +- .github/workflows/lib-openai-tests.yml | 2 +- .github/workflows/python_sdk_unit_tests.yml | 2 +- .../opik/api_objects/dataset/converters.py | 5 - .../src/opik/api_objects/dataset/dataset.py | 12 +- .../opik/api_objects/dataset/dataset_item.py | 7 +- sdks/python/src/opik/config.py | 2 +- sdks/python/src/opik/evaluation/evaluator.py | 12 +- .../llm_judges/hallucination/metric.py | 4 +- .../src/opik/evaluation/scoring_runner.py | 76 ------------ .../python/src/opik/evaluation/task_runner.py | 74 ----------- .../src/opik/evaluation/tasks_scorer.py | 115 ++++++++++++++++++ sdks/python/src/opik/plugins/__init__.py | 0 .../src/opik/plugins/pytest/decorator.py | 25 ++-- sdks/python/src/opik/plugins/pytest/hooks.py | 1 - 15 files changed, 151 insertions(+), 188 deletions(-) delete mode 100644 sdks/python/src/opik/evaluation/scoring_runner.py delete mode 100644 sdks/python/src/opik/evaluation/task_runner.py create mode 100644 sdks/python/src/opik/evaluation/tasks_scorer.py create mode 100644 sdks/python/src/opik/plugins/__init__.py diff --git a/.github/workflows/lib-langchain-tests.yml b/.github/workflows/lib-langchain-tests.yml index 50edeb6da9..7819526fcf 100644 --- a/.github/workflows/lib-langchain-tests.yml +++ b/.github/workflows/lib-langchain-tests.yml @@ -33,7 +33,7 @@ jobs: python-version: ${{matrix.python_version}} - name: Install opik - run: pip install -e . + run: pip install . - name: Install test tools run: | diff --git a/.github/workflows/lib-openai-tests.yml b/.github/workflows/lib-openai-tests.yml index f336016df6..66db25daab 100644 --- a/.github/workflows/lib-openai-tests.yml +++ b/.github/workflows/lib-openai-tests.yml @@ -33,7 +33,7 @@ jobs: python-version: ${{matrix.python_version}} - name: Install opik - run: pip install -e . + run: pip install . - name: Install test tools run: | diff --git a/.github/workflows/python_sdk_unit_tests.yml b/.github/workflows/python_sdk_unit_tests.yml index a8fb345f8f..843adc8722 100644 --- a/.github/workflows/python_sdk_unit_tests.yml +++ b/.github/workflows/python_sdk_unit_tests.yml @@ -37,7 +37,7 @@ jobs: python-version: ${{ matrix.python_version }} - name: Install opik - run: pip install -e . + run: pip install . - name: Install test requirements run: | diff --git a/sdks/python/src/opik/api_objects/dataset/converters.py b/sdks/python/src/opik/api_objects/dataset/converters.py index cf94b2c2a1..5c5a0b9754 100644 --- a/sdks/python/src/opik/api_objects/dataset/converters.py +++ b/sdks/python/src/opik/api_objects/dataset/converters.py @@ -69,8 +69,3 @@ def from_json( result.append(dataset_item.DatasetItem(**item_kwargs)) return result - - -def from_dicts(dict_items: List[Dict[str, Any]]) -> List[dataset_item.DatasetItem]: - items = [dataset_item.DatasetItem(**dict_item) for dict_item in dict_items] - return items diff --git a/sdks/python/src/opik/api_objects/dataset/dataset.py b/sdks/python/src/opik/api_objects/dataset/dataset.py index 6da675882e..70cd096357 100644 --- a/sdks/python/src/opik/api_objects/dataset/dataset.py +++ b/sdks/python/src/opik/api_objects/dataset/dataset.py @@ -1,6 +1,6 @@ import logging import json -from typing import Optional, Any, List, Dict, Union, cast +from typing import Optional, Any, List, Dict, Union, Sequence from opik.rest_api import client as rest_api_client from opik.rest_api.types import dataset_item as rest_dataset_item @@ -39,7 +39,7 @@ def description(self) -> Optional[str]: return self._description def insert( - self, items: Union[List[dataset_item.DatasetItem], List[Dict[str, Any]]] + self, items: Sequence[Union[dataset_item.DatasetItem, Dict[str, Any]]] ) -> None: """ Insert new items into the dataset. @@ -48,10 +48,10 @@ def insert( items: List of DatasetItem objects or dicts (which will be converted to DatasetItem objects) to add to the dataset. """ - if len(items) > 0 and isinstance(items[0], dict): - items = converters.from_dicts(items) # type: ignore - - items = cast(List[dataset_item.DatasetItem], items) + items: List[dataset_item.DatasetItem] = [ # type: ignore + (dataset_item.DatasetItem(**item) if isinstance(item, dict) else item) + for item in items + ] rest_items = [ rest_dataset_item.DatasetItem( diff --git a/sdks/python/src/opik/api_objects/dataset/dataset_item.py b/sdks/python/src/opik/api_objects/dataset/dataset_item.py index 31d1ae6aa5..8af934392c 100644 --- a/sdks/python/src/opik/api_objects/dataset/dataset_item.py +++ b/sdks/python/src/opik/api_objects/dataset/dataset_item.py @@ -1,12 +1,13 @@ from typing import Optional, Dict, Any -import dataclasses +import pydantic from .. import constants -@dataclasses.dataclass -class DatasetItem: +class DatasetItem(pydantic.BaseModel): """A DatasetItem object representing an item in a dataset.""" + model_config = pydantic.ConfigDict(strict=True) + input: Dict[str, Any] """The input data for the dataset item.""" diff --git a/sdks/python/src/opik/config.py b/sdks/python/src/opik/config.py index b43c764268..6549e7c736 100644 --- a/sdks/python/src/opik/config.py +++ b/sdks/python/src/opik/config.py @@ -38,7 +38,7 @@ def settings_customise_sources( # Below are Opik configurations - url_override: str = "https://comet.com/opik/api" + url_override: str = "https://www.comet.com/opik/api" """Opik backend base URL""" project_name: str = "Default Project" diff --git a/sdks/python/src/opik/evaluation/evaluator.py b/sdks/python/src/opik/evaluation/evaluator.py index 1de47c3a3c..2582ba83f9 100644 --- a/sdks/python/src/opik/evaluation/evaluator.py +++ b/sdks/python/src/opik/evaluation/evaluator.py @@ -7,7 +7,7 @@ from ..api_objects.experiment import experiment_item from ..api_objects import opik_client -from . import task_runner, test_result, scoring_runner, scores_logger, report +from . import tasks_scorer, test_result, scores_logger, report def evaluate( @@ -47,18 +47,12 @@ def evaluate( client = opik_client.get_client_cached() start_time = time.time() - test_cases = task_runner.run( + test_results = tasks_scorer.run( client=client, dataset_=dataset, task=task, - workers=task_threads, - verbose=verbose, - ) - - test_results = scoring_runner.run( - test_cases=test_cases, scoring_metrics=scoring_metrics, - workers=scoring_threads, + workers=task_threads, verbose=verbose, ) diff --git a/sdks/python/src/opik/evaluation/metrics/llm_judges/hallucination/metric.py b/sdks/python/src/opik/evaluation/metrics/llm_judges/hallucination/metric.py index d5f32ff99b..a7bd47aea2 100644 --- a/sdks/python/src/opik/evaluation/metrics/llm_judges/hallucination/metric.py +++ b/sdks/python/src/opik/evaluation/metrics/llm_judges/hallucination/metric.py @@ -114,7 +114,9 @@ def _parse_model_output(self, content: str) -> score_result.ScoreResult: verdict: str = dict_content[template.VERDICT_KEY] score = 1.0 if verdict.lower() == template.HALLUCINATION_VERDICT else 0.0 return score_result.ScoreResult( - name=self.name, value=score, reason=dict_content[template.REASON_KEY] + name=self.name, + value=score, + reason=str(dict_content[template.REASON_KEY]), ) except Exception: raise exceptions.MetricComputationError( diff --git a/sdks/python/src/opik/evaluation/scoring_runner.py b/sdks/python/src/opik/evaluation/scoring_runner.py deleted file mode 100644 index ba704291d5..0000000000 --- a/sdks/python/src/opik/evaluation/scoring_runner.py +++ /dev/null @@ -1,76 +0,0 @@ -from typing import List -from concurrent import futures - -from .metrics import base_metric, score_result - -from . import test_result, test_case - -import tqdm - - -def _process_test_case( - test_case_: test_case.TestCase, scoring_metrics: List[base_metric.BaseMetric] -) -> test_result.TestResult: - score_results = [] - for metric in scoring_metrics: - try: - result = metric.score( - **test_case_.task_output.model_dump(exclude_none=False) - ) - if isinstance(result, list): - score_results += result - else: - score_results.append(result) - except Exception as e: - # This can be problematic if the metric returns a list of strings as we will not know the name of the metrics that have failed - score_results.append( - score_result.ScoreResult( - name=metric.name, value=0.0, reason=str(e), scoring_failed=True - ) - ) - - test_result_ = test_result.TestResult( - test_case=test_case_, score_results=score_results - ) - - return test_result_ - - -def run( - test_cases: List[test_case.TestCase], - scoring_metrics: List[base_metric.BaseMetric], - workers: int, - verbose: int, -) -> List[test_result.TestResult]: - test_results: List[test_result.TestResult] - - if workers == 1: - test_results = [ - _process_test_case(test_case_, scoring_metrics) - for test_case_ in tqdm.tqdm( - test_cases, - disable=(verbose < 1), - desc="Scoring outputs", - total=len(test_cases), - ) - ] - - return test_results - - with futures.ThreadPoolExecutor(max_workers=workers) as pool: - test_result_futures = [ - pool.submit(_process_test_case, test_case_, scoring_metrics) - for test_case_ in test_cases - ] - - test_results = [ - test_result_future.result() - for test_result_future in tqdm.tqdm( - futures.as_completed(test_result_futures), - disable=(verbose < 1), - desc="Scoring outputs", - total=len(test_result_futures), - ) - ] - - return test_results diff --git a/sdks/python/src/opik/evaluation/task_runner.py b/sdks/python/src/opik/evaluation/task_runner.py deleted file mode 100644 index 2543a7e289..0000000000 --- a/sdks/python/src/opik/evaluation/task_runner.py +++ /dev/null @@ -1,74 +0,0 @@ -import tqdm -from concurrent import futures - -from typing import List -from .types import LLMTask -from opik.api_objects.dataset import dataset, dataset_item -from opik.api_objects import opik_client -from opik import context_storage - -from . import task_output, test_case - - -def _process_item( - client: opik_client.Opik, item: dataset_item.DatasetItem, task: LLMTask -) -> test_case.TestCase: - assert item.id is not None - - try: - trace = client.trace(input=item.input, name="evaluation_task") - context_storage.set_trace(trace) - task_output_ = task(item) - trace.end(output=task_output_) - - test_case_ = test_case.TestCase( - trace_id=trace.id, - dataset_item_id=item.id, - task_output=task_output.TaskOutput(**task_output_), - ) - return test_case_ - - finally: - context_storage.pop_trace() - - -def run( - client: opik_client.Opik, - dataset_: dataset.Dataset, - task: LLMTask, - workers: int, - verbose: int, -) -> List[test_case.TestCase]: - dataset_items = dataset_.get_all_items() - test_cases: List[test_case.TestCase] - - if workers == 1: - test_cases = [ - _process_item(client, item, task) - for item in tqdm.tqdm( - dataset_items, - disable=(verbose < 1), - desc="Running tasks", - total=len(dataset_items), - ) - ] - return test_cases - - with futures.ThreadPoolExecutor(max_workers=workers) as pool: - test_case_futures = [ - pool.submit(_process_item, client, item, task) for item in dataset_items - ] - - test_cases = [ - test_case_future.result() - for test_case_future in tqdm.tqdm( - futures.as_completed( - test_case_futures, - ), - disable=(verbose < 1), - desc="Running tasks", - total=len(test_case_futures), - ) - ] - - return test_cases diff --git a/sdks/python/src/opik/evaluation/tasks_scorer.py b/sdks/python/src/opik/evaluation/tasks_scorer.py new file mode 100644 index 0000000000..6d2e6a85e4 --- /dev/null +++ b/sdks/python/src/opik/evaluation/tasks_scorer.py @@ -0,0 +1,115 @@ +import tqdm +from concurrent import futures + +from typing import List +from .types import LLMTask +from opik.api_objects.dataset import dataset, dataset_item +from opik.api_objects import opik_client +from opik import context_storage + +from . import task_output, test_case, test_result +from .metrics import score_result, base_metric + + +def _score_test_case( + test_case_: test_case.TestCase, scoring_metrics: List[base_metric.BaseMetric] +) -> test_result.TestResult: + score_results = [] + for metric in scoring_metrics: + try: + result = metric.score( + **test_case_.task_output.model_dump(exclude_none=False) + ) + if isinstance(result, list): + score_results += result + else: + score_results.append(result) + except Exception as e: + # This can be problematic if the metric returns a list of strings as we will not know the name of the metrics that have failed + score_results.append( + score_result.ScoreResult( + name=metric.name, value=0.0, reason=str(e), scoring_failed=True + ) + ) + + test_result_ = test_result.TestResult( + test_case=test_case_, score_results=score_results + ) + + return test_result_ + + +def _process_item( + client: opik_client.Opik, + item: dataset_item.DatasetItem, + task: LLMTask, + scoring_metrics: List[base_metric.BaseMetric], +) -> test_result.TestResult: + assert item.id is not None + + try: + trace = client.trace(input=item.input, name="evaluation_task") + context_storage.set_trace(trace) + task_output_ = task(item) + trace.end(output=task_output_) + + test_case_ = test_case.TestCase( + trace_id=trace.id, + dataset_item_id=item.id, + task_output=task_output.TaskOutput(**task_output_), + ) + + test_result_ = _score_test_case( + test_case_=test_case_, scoring_metrics=scoring_metrics + ) + + return test_result_ + + finally: + context_storage.pop_trace() + + +def run( + client: opik_client.Opik, + dataset_: dataset.Dataset, + task: LLMTask, + scoring_metrics: List[base_metric.BaseMetric], + workers: int, + verbose: int, +) -> List[test_result.TestResult]: + dataset_items = dataset_.get_all_items() + test_cases: List[test_result.TestResult] + + if workers == 1: + test_cases = [ + _process_item( + client=client, item=item, task=task, scoring_metrics=scoring_metrics + ) + for item in tqdm.tqdm( + dataset_items, + disable=(verbose < 1), + desc="Evaluation", + total=len(dataset_items), + ) + ] + return test_cases + + with futures.ThreadPoolExecutor(max_workers=workers) as pool: + test_case_futures = [ + pool.submit(_process_item, client, item, task, scoring_metrics) + for item in dataset_items + ] + + test_cases = [ + test_case_future.result() + for test_case_future in tqdm.tqdm( + futures.as_completed( + test_case_futures, + ), + disable=(verbose < 1), + desc="Evaluation", + total=len(test_case_futures), + ) + ] + + return test_cases diff --git a/sdks/python/src/opik/plugins/__init__.py b/sdks/python/src/opik/plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/src/opik/plugins/pytest/decorator.py b/sdks/python/src/opik/plugins/pytest/decorator.py index d05ad978a7..d727ffd7aa 100644 --- a/sdks/python/src/opik/plugins/pytest/decorator.py +++ b/sdks/python/src/opik/plugins/pytest/decorator.py @@ -50,8 +50,10 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: test_trace is not None and test_span is not None ), "Must not be None here by design assumption" - node_id: str = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0] + node_id: str = _get_test_nodeid() test_runs_storage.LLM_UNIT_TEST_RUNS.add(node_id) + print(test_runs_storage.LLM_UNIT_TEST_RUNS) + test_run_content_ = _get_test_run_content( func=func, args=args, @@ -59,12 +61,12 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: argnames_mapping=argnames_mapping, ) + trace_input = {**test_run_content_.input} + trace_input.pop("test_name") # we don't need it in traces test_trace.update( - input=test_run_content_.input, metadata=test_run_content_.metadata - ) - test_span.update( - input=test_run_content_.input, metadata=test_run_content_.metadata + input=trace_input, metadata=test_run_content_.metadata ) + test_span.update(input=trace_input, metadata=test_run_content_.metadata) test_runs_storage.TEST_RUNS_TRACES[node_id] = test_trace test_runs_storage.TEST_RUNS_CONTENTS[node_id] = test_run_content_ @@ -83,8 +85,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return decorator -def _get_test_name() -> str: - return os.environ["PYTEST_CURRENT_TEST"].split("/")[-1].split(" ")[0] +def _get_test_nodeid() -> str: + # Examples of environment variables: + # 'sdks/python/tests/tests_sandbox/test_things.py::TestGroup::test_example[13 32] (call)' + # 'sdks/python/tests/tests_sandbox/test_things.py::TestGroup::test_example (call)' + # 'sdks/python/tests/tests_sandbox/test_things.py::test_example (call)' + + return os.environ["PYTEST_CURRENT_TEST"].rpartition(" ")[0] def _get_test_run_content( @@ -99,9 +106,9 @@ def _get_test_run_content( expected_output = test_inputs.get(argnames_mapping["expected_output"], None) if not isinstance(input, dict): - input = {"test_name": _get_test_name(), "input": input} + input = {"test_name": _get_test_nodeid(), "input": input} else: - input = {"test_name": _get_test_name(), **input} + input = {"test_name": _get_test_nodeid(), **input} if expected_output is not None and not isinstance(expected_output, dict): expected_output = {"expected_output": expected_output} diff --git a/sdks/python/src/opik/plugins/pytest/hooks.py b/sdks/python/src/opik/plugins/pytest/hooks.py index d02d1fb532..aa684fd0f3 100644 --- a/sdks/python/src/opik/plugins/pytest/hooks.py +++ b/sdks/python/src/opik/plugins/pytest/hooks.py @@ -29,7 +29,6 @@ def pytest_sessionfinish(session: "Session", exitstatus: Any) -> None: for test_item in session.items if test_item.nodeid in test_runs_storage.LLM_UNIT_TEST_RUNS ] - if len(llm_test_items) == 0: return