diff --git a/.github/workflows/lib-dspy-tests.yml b/.github/workflows/lib-dspy-tests.yml new file mode 100644 index 0000000000..1b6f6c548c --- /dev/null +++ b/.github/workflows/lib-dspy-tests.yml @@ -0,0 +1,51 @@ +# Workflow to run DSPy tests +# +# Please read inputs to provide correct values. +# +name: SDK Lib DSPy Tests +run-name: "SDK Lib DSPy Tests ${{ github.ref_name }} by @${{ github.actor }}" +env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }} +on: + workflow_call: + +jobs: + tests: + name: DSPy Python ${{matrix.python_version}} + runs-on: ubuntu-latest + defaults: + run: + working-directory: sdks/python + + strategy: + fail-fast: true + matrix: + python_version: ["3.9", "3.10", "3.11", "3.12"] + + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Setup Python ${{matrix.python_version}} + uses: actions/setup-python@v5 + with: + python-version: ${{matrix.python_version}} + + - name: Install opik + run: pip install . + + - name: Install test tools + run: | + cd ./tests + pip install --no-cache-dir --disable-pip-version-check -r test_requirements.txt + + - name: Install lib + run: | + cd ./tests + pip install --no-cache-dir --disable-pip-version-check -r library_integration/dspy/requirements.txt + + - name: Run tests + run: | + cd ./tests/library_integration/dspy/ + python -m pytest -vv . \ No newline at end of file diff --git a/.github/workflows/lib-integration-tests-runner.yml b/.github/workflows/lib-integration-tests-runner.yml index 8885dab222..4bf3d12b7e 100644 --- a/.github/workflows/lib-integration-tests-runner.yml +++ b/.github/workflows/lib-integration-tests-runner.yml @@ -18,6 +18,7 @@ on: - aisuite - haystack - guardrails + - dspy schedule: - cron: "0 0 */1 * *" pull_request: @@ -87,3 +88,9 @@ jobs: if: contains(fromJSON('["guardrails", "all"]'), needs.init_environment.outputs.LIBS) uses: ./.github/workflows/lib-guardrails-tests.yml secrets: inherit + + dspy_tests: + needs: [init_environment] + if: contains(fromJSON('["dspy", "all"]'), needs.init_environment.outputs.LIBS) + uses: ./.github/workflows/lib-dspy-tests.yml + secrets: inherit diff --git a/sdks/python/src/opik/integrations/dspy/__init__.py b/sdks/python/src/opik/integrations/dspy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py new file mode 100644 index 0000000000..52b8fdc340 --- /dev/null +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -0,0 +1,246 @@ +from contextvars import ContextVar, Token +from typing import Any, Dict, Optional, Union + +import dspy +from dspy.utils.callback import BaseCallback + +from opik import opik_context +from opik.api_objects import helpers, span, trace +from opik.api_objects.opik_client import get_client_cached +from opik.decorator import error_info_collector + +ContextType = Union[span.SpanData, trace.TraceData] + + +class OpikCallback(BaseCallback): + def __init__( + self, + project_name: Optional[str] = None, + ): + self._map_call_id_to_span_data: Dict[str, span.SpanData] = {} + self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} + self._map_span_id_or_trace_id_to_token: Dict[str, Token] = {} + + self._current_callback_context: ContextVar[Optional[ContextType]] = ContextVar( + "opik_context", default=None + ) + + self._project_name = project_name + + self._opik_client = get_client_cached() + + def on_module_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + if current_callback_context_data := self._current_callback_context.get(): + if isinstance(current_callback_context_data, span.SpanData): + self._attach_span_to_existing_span( + call_id=call_id, + current_span_data=current_callback_context_data, + instance=instance, + inputs=inputs, + ) + else: + self._attach_span_to_existing_trace( + call_id=call_id, + current_trace_data=current_callback_context_data, + instance=instance, + inputs=inputs, + ) + return + + if current_span_data := opik_context.get_current_span_data(): + self._attach_span_to_existing_span( + call_id=call_id, + current_span_data=current_span_data, + instance=instance, + inputs=inputs, + ) + new_span_data = self._map_call_id_to_span_data[call_id] + self._callback_context_set(new_span_data) + return + + if current_trace_data := opik_context.get_current_trace_data(): + self._attach_span_to_existing_trace( + call_id=call_id, + current_trace_data=current_trace_data, + instance=instance, + inputs=inputs, + ) + new_span_data = self._map_call_id_to_span_data[call_id] + self._callback_context_set(new_span_data) + return + + self._start_trace( + call_id=call_id, + instance=instance, + inputs=inputs, + ) + + def _attach_span_to_existing_span( + self, + call_id: str, + current_span_data: span.SpanData, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + project_name = helpers.resolve_child_span_project_name( + parent_project_name=current_span_data.project_name, + child_project_name=self._project_name, + ) + span_type = self._get_span_type(instance) + + span_data = span.SpanData( + trace_id=current_span_data.trace_id, + parent_span_id=current_span_data.id, + name=instance.__class__.__name__, + input=inputs, + type=span_type, + project_name=project_name, + ) + self._map_call_id_to_span_data[call_id] = span_data + + def _attach_span_to_existing_trace( + self, + call_id: str, + current_trace_data: trace.TraceData, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + project_name = helpers.resolve_child_span_project_name( + current_trace_data.project_name, + self._project_name, + ) + span_type = self._get_span_type(instance) + + span_data = span.SpanData( + trace_id=current_trace_data.id, + parent_span_id=None, + name=instance.__class__.__name__, + input=inputs, + type=span_type, + project_name=project_name, + ) + self._map_call_id_to_span_data[call_id] = span_data + + def _start_trace( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + trace_data = trace.TraceData( + name=instance.__class__.__name__, + input=inputs, + metadata={"created_from": "dspy"}, + project_name=self._project_name, + ) + self._map_call_id_to_trace_data[call_id] = trace_data + self._callback_context_set(trace_data) + + def on_module_end( + self, + call_id: str, + outputs: Optional[Any], + exception: Optional[Exception] = None, + ) -> None: + self._end_span( + call_id=call_id, + exception=exception, + outputs=outputs, + ) + self._end_trace(call_id=call_id) + + def _end_trace(self, call_id: str) -> None: + if trace_data := self._map_call_id_to_trace_data.pop(call_id, None): + trace_data.init_end_time() + self._opik_client.trace(**trace_data.__dict__) + + # remove trace data from context + if token := self._map_span_id_or_trace_id_to_token.pop(trace_data.id, None): + self._current_callback_context.reset(token) + + def _end_span( + self, + call_id: str, + outputs: Optional[Any], + exception: Optional[Exception] = None, + ) -> None: + if span_data := self._map_call_id_to_span_data.pop(call_id, None): + if exception: + error_info = error_info_collector.collect(exception) + span_data.update(error_info=error_info) + + span_data.update(output={"output": outputs}).init_end_time() + self._opik_client.span(**span_data.__dict__) + + # remove span data from context + if token := self._map_span_id_or_trace_id_to_token.pop(span_data.id, None): + self._current_callback_context.reset(token) + + def on_lm_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + current_callback_context_data = self._current_callback_context.get() + assert current_callback_context_data is not None + + project_name = helpers.resolve_child_span_project_name( + current_callback_context_data.project_name, + self._project_name, + ) + + if isinstance(current_callback_context_data, span.SpanData): + trace_id = current_callback_context_data.trace_id + parent_span_id = current_callback_context_data.id + else: + trace_id = current_callback_context_data.id + parent_span_id = None + + provider, model = instance.model.split(r"/", 1) + span_type = self._get_span_type(instance) + + span_data = span.SpanData( + trace_id=trace_id, + name=instance.__class__.__name__, + parent_span_id=parent_span_id, + type=span_type, + input=inputs, + project_name=project_name, + provider=provider, + model=model, + ) + self._map_call_id_to_span_data[call_id] = span_data + + def on_lm_end( + self, + call_id: str, + outputs: Optional[Dict[str, Any]], + exception: Optional[Exception] = None, + ) -> None: + self._end_span( + call_id=call_id, + exception=exception, + outputs=outputs, + ) + self._end_trace(call_id=call_id) + + def flush(self) -> None: + """Sends pending Opik data to the backend""" + self._opik_client.flush() + + def _callback_context_set(self, value: ContextType) -> None: + token = self._current_callback_context.set(value) + self._map_span_id_or_trace_id_to_token[value.id] = token + + def _get_span_type(self, instance: Any) -> span.SpanType: + if isinstance(instance, dspy.Predict): + return "llm" + elif isinstance(instance, dspy.LM): + return "llm" + return "general" diff --git a/sdks/python/src/opik/types.py b/sdks/python/src/opik/types.py index 4dd2b71efb..18ccd572c0 100644 --- a/sdks/python/src/opik/types.py +++ b/sdks/python/src/opik/types.py @@ -66,7 +66,7 @@ class FeedbackScoreDict(TypedDict): class ErrorInfoDict(TypedDict): """ - A TypedDict representing the information about the error occured. + A TypedDict representing the information about the error occurred. """ exception_type: str diff --git a/sdks/python/tests/library_integration/dspy/__init__.py b/sdks/python/tests/library_integration/dspy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/tests/library_integration/dspy/requirements.txt b/sdks/python/tests/library_integration/dspy/requirements.txt new file mode 100644 index 0000000000..65525d8942 --- /dev/null +++ b/sdks/python/tests/library_integration/dspy/requirements.txt @@ -0,0 +1 @@ +dspy diff --git a/sdks/python/tests/library_integration/dspy/test_dspy.py b/sdks/python/tests/library_integration/dspy/test_dspy.py new file mode 100644 index 0000000000..02d0516bda --- /dev/null +++ b/sdks/python/tests/library_integration/dspy/test_dspy.py @@ -0,0 +1,480 @@ +from typing import Union + +import dspy +import pytest + +import opik +from opik import context_storage +from opik.api_objects import opik_client, span, trace +from opik.config import OPIK_PROJECT_DEFAULT_NAME +from opik.integrations.dspy.callback import OpikCallback +from ...testlib import ( + ANY_BUT_NONE, + ANY_DICT, + ANY_STRING, + SpanModel, + TraceModel, + assert_equal, +) + + +def sort_spans_by_name(tree: Union[SpanModel, TraceModel]) -> None: + """ + Sorts the spans within a trace/span tree by their names in ascending order. + """ + tree.spans = sorted(tree.spans, key=lambda span: span.name) + + +@pytest.mark.parametrize( + "project_name, expected_project_name", + [ + (None, OPIK_PROJECT_DEFAULT_NAME), + ("dspy-integration-test", "dspy-integration-test"), + ], +) +def test_dspy__happyflow( + fake_backend, + project_name, + expected_project_name, +): + lm = dspy.LM( + cache=False, + model="openai/gpt-4o-mini", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="ChainOfThought", + input={"args": (), "kwargs": {"question": "What is the meaning of life?"}}, + output=None, + metadata={"created_from": "dspy"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-4o-mini", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[], + ), + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 2 + + sort_spans_by_name(EXPECTED_TRACE_TREE) + sort_spans_by_name(fake_backend.trace_trees[0]) + + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy__openai_llm_is_used__error_occurred_during_openai_call__error_info_is_logged( + fake_backend, +): + lm = dspy.LM( + cache=False, + model="openai/gpt-3.5-turbo", + api_key="incorrect-api-key", + ) + dspy.configure(lm=lm) + + project_name = "dspy-integration-test" + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + + with pytest.raises(Exception): + cot(question="What is the meaning of life?") + + opik_callback.flush() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="ChainOfThought", + input={"args": (), "kwargs": {"question": "What is the meaning of life?"}}, + output=None, + metadata={"created_from": "dspy"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + error_info={ + "exception_type": ANY_STRING(), + "message": ANY_STRING(), + "traceback": ANY_STRING(), + }, + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + error_info={ + "exception_type": ANY_STRING(), + "message": ANY_STRING(), + "traceback": ANY_STRING(), + }, + ), + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 2 + + sort_spans_by_name(EXPECTED_TRACE_TREE) + sort_spans_by_name(fake_backend.trace_trees[0]) + + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_inside_another_track_function__data_attached_to_existing_trace_tree( + fake_backend, +): + project_name = "dspy-integration-test" + + @opik.track(project_name=project_name, capture_output=True) + def f(x): + lm = dspy.LM( + cache=False, + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + return "the-output" + + f("the-input") + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="f", + input={"x": "the-input"}, + output={"output": "the-output"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name="f", + type="general", + input={"x": "the-input"}, + output={"output": "the-output"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + ), + ], + ) + ], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 1 + + sort_spans_by_name(EXPECTED_TRACE_TREE.spans[0].spans[0]) + sort_spans_by_name(fake_backend.trace_trees[0].spans[0].spans[0]) + + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_when_there_was_already_existing_trace_without_span__data_attached_to_existing_trace( + fake_backend, +): + def f(): + lm = dspy.LM( + cache=False, + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback() + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + client = opik_client.get_client_cached() + + # Prepare context to have manually created trace data + trace_data = trace.TraceData( + name="manually-created-trace", + input={"input": "input-of-manually-created-trace"}, + ) + context_storage.set_trace_data(trace_data) + + f() + + # Send trace data + trace_data = context_storage.pop_trace_data() + trace_data.init_end_time().update( + output={"output": "output-of-manually-created-trace"} + ) + client.trace(**trace_data.__dict__) + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="manually-created-trace", + input={"input": "input-of-manually-created-trace"}, + output={"output": "output-of-manually-created-trace"}, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + ], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 1 + + sort_spans_by_name(EXPECTED_TRACE_TREE.spans[0]) + sort_spans_by_name(fake_backend.trace_trees[0].spans[0]) + + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_when_there_was_already_existing_span_without_trace__data_attached_to_existing_span( + fake_backend, +): + def f(): + lm = dspy.LM( + cache=False, + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback() + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + client = opik_client.get_client_cached() + span_data = span.SpanData( + trace_id="some-trace-id", + name="manually-created-span", + input={"input": "input-of-manually-created-span"}, + ) + context_storage.add_span_data(span_data) + + f() + + span_data = context_storage.pop_span_data() + span_data.init_end_time().update( + output={"output": "output-of-manually-created-span"} + ) + client.span(**span_data.__dict__) + opik.flush_tracker() + + EXPECTED_SPANS_TREE = SpanModel( + id=ANY_STRING(), + name="manually-created-span", + input={"input": "input-of-manually-created-span"}, + output={"output": "output-of-manually-created-span"}, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + ], + ) + ], + ) + + assert len(fake_backend.span_trees) == 1 + + sort_spans_by_name(EXPECTED_SPANS_TREE.spans[0]) + sort_spans_by_name(fake_backend.span_trees[0].spans[0]) + + assert_equal(EXPECTED_SPANS_TREE, fake_backend.span_trees[0])