From 81f14e8f8f46c38a564873e844633b4073ab3647 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Thu, 19 Dec 2024 18:06:39 +0000 Subject: [PATCH 01/13] OPIK-615 [SDK] DSPY integration --- .../src/opik/integrations/dspy/__init__.py | 0 .../src/opik/integrations/dspy/callback.py | 99 +++++++++++++++++++ sdks/python/src/opik/types.py | 2 +- 3 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 sdks/python/src/opik/integrations/dspy/__init__.py create mode 100644 sdks/python/src/opik/integrations/dspy/callback.py diff --git a/sdks/python/src/opik/integrations/dspy/__init__.py b/sdks/python/src/opik/integrations/dspy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py new file mode 100644 index 0000000000..b2e5106283 --- /dev/null +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -0,0 +1,99 @@ +from typing import Any, Dict, Optional + +from dspy.utils.callback import BaseCallback + +from opik import context_storage, opik_context +from opik.api_objects import opik_client, span, trace +from opik.decorator import error_info_collector + + +class OpikCallback(BaseCallback): + + def __init__( + self, + project_name: Optional[str] = None, + ): + self._opik_trace_data: Optional[trace.TraceData] = None + self._project_name = project_name + self._opik_client = opik_client.Opik( + _use_batching=True, + project_name=project_name, + ) + + def on_module_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + print(f"on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}, inputs: {inputs}") + + if self._opik_trace_data is not None: + return + + existing_trace_data = opik_context.get_current_trace_data() + if existing_trace_data: + self._opik_trace_data = existing_trace_data + else: + trace_data = trace.TraceData( + name=instance.__class__.__name__, + metadata={"created_from": "dspy"}, + project_name=self._project_name, + ) + self._opik_trace_data = trace_data + + def on_module_end( + self, + call_id: str, + outputs: Optional[Any], + exception: Optional[Exception] = None, + ): + print(f"on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") + + if self._opik_trace_data is None: + return + + if exception: + error_info = error_info_collector.collect(exception) + self._opik_trace_data.update(error_info=error_info) + + self._opik_trace_data.init_end_time() + self._opik_client.trace(**self._opik_trace_data.__dict__) + self._opik_trace_data = None + + def on_lm_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + print(f"LM is called with inputs: {inputs}") + + span_data = span.SpanData( + trace_id=self._opik_trace_data.id, + name=instance.__class__.__name__, + parent_span_id=None, + type="llm", + input=inputs, + project_name=self._opik_trace_data.project_name, + ) + context_storage.add_span_data(span_data) + + + def on_lm_end( + self, + call_id: str, + outputs: Optional[Dict[str, Any]], + exception: Optional[Exception] = None, + ): + print(f"LM is finished with outputs: {outputs}") + + span_data = context_storage.pop_span_data() + + if exception: + error_info = error_info_collector.collect(exception) + span_data.update(error_info=error_info) + + span_data.update(output={"output": outputs}).init_end_time() + + self._opik_client.span(**span_data.__dict__) diff --git a/sdks/python/src/opik/types.py b/sdks/python/src/opik/types.py index 4dd2b71efb..18ccd572c0 100644 --- a/sdks/python/src/opik/types.py +++ b/sdks/python/src/opik/types.py @@ -66,7 +66,7 @@ class FeedbackScoreDict(TypedDict): class ErrorInfoDict(TypedDict): """ - A TypedDict representing the information about the error occured. + A TypedDict representing the information about the error occurred. """ exception_type: str From 918a9337fbc21de18fa1ee2766d82a5571aed6de Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Mon, 23 Dec 2024 17:35:49 +0000 Subject: [PATCH 02/13] wip --- .../src/opik/integrations/dspy/callback.py | 187 ++++++++++++++---- 1 file changed, 150 insertions(+), 37 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index b2e5106283..210247e4e2 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -1,19 +1,28 @@ -from typing import Any, Dict, Optional +from contextvars import ContextVar +from typing import Any, Dict, Optional, Set from dspy.utils.callback import BaseCallback from opik import context_storage, opik_context -from opik.api_objects import opik_client, span, trace +from opik.api_objects import helpers, opik_client, span, trace from opik.decorator import error_info_collector +_OPIK_CONTEXT = ContextVar("opik_context", default=None) + + class OpikCallback(BaseCallback): def __init__( self, project_name: Optional[str] = None, ): - self._opik_trace_data: Optional[trace.TraceData] = None + self._map_call_id_to_span_data: Dict[str, span.SpanData] = {} + self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} + self._externally_created_traces_ids: Set[str] = set() + + self._current_context: ContextVar[Optional[span.SpanData]] = _OPIK_CONTEXT + self._project_name = project_name self._opik_client = opik_client.Opik( _use_batching=True, @@ -25,22 +34,117 @@ def on_module_start( call_id: str, instance: Any, inputs: Dict[str, Any], - ): + ) -> None: print(f"on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}, inputs: {inputs}") - if self._opik_trace_data is not None: + if current_context := self._current_context.get(): + self._attach_span_to_existing_span( + call_id=call_id, + current_span_data=current_context, + instance=instance, + inputs=inputs, + ) + return + + if current_span_data := opik_context.get_current_span_data(): + self._current_context.set(current_span_data) + self._attach_span_to_existing_span( + call_id=call_id, + current_span_data=current_span_data, + instance=instance, + inputs=inputs, + ) return - existing_trace_data = opik_context.get_current_trace_data() - if existing_trace_data: - self._opik_trace_data = existing_trace_data - else: - trace_data = trace.TraceData( - name=instance.__class__.__name__, - metadata={"created_from": "dspy"}, - project_name=self._project_name, + if current_trace_data := opik_context.get_current_trace_data(): + self._attach_span_to_existing_trace( + call_id=call_id, + current_trace_data=current_trace_data, + instance=instance, + inputs=inputs, ) - self._opik_trace_data = trace_data + return + + self._initialize_span_and_trace_from_scratch( + call_id=call_id, + instance=instance, + inputs=inputs, + ) + + # trace_data = trace.TraceData( + # name=instance.__class__.__name__, + # metadata={"created_from": "dspy"}, + # project_name=self._project_name, + # ) + + def _attach_span_to_existing_span( + self, + call_id: str, + current_span_data: span.SpanData, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + project_name = helpers.resolve_child_span_project_name( + parent_project_name=current_span_data.project_name, + child_project_name=self._project_name, + ) + + span_data = span.SpanData( + trace_id=current_span_data.trace_id, + parent_span_id=current_span_data.id, + name=instance.__class__.__name__, + input=inputs, + project_name=project_name, + ) + self._map_call_id_to_span_data[call_id] = span_data + self._externally_created_traces_ids.add(span_data.trace_id) + + def _attach_span_to_existing_trace( + self, + call_id: str, + current_trace_data: trace.TraceData, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + project_name = helpers.resolve_child_span_project_name( + current_trace_data.project_name, + self._project_name, + ) + + span_data = span.SpanData( + trace_id=current_trace_data.id, + parent_span_id=None, + name=instance.__class__.__name__, + input=inputs, + project_name=project_name, + ) + self._current_context.set(span_data) + self._map_call_id_to_span_data[call_id] = span_data + self._externally_created_traces_ids.add(current_trace_data.id) + + def _initialize_span_and_trace_from_scratch( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ) -> None: + trace_data = trace.TraceData( + name=instance.__class__.__name__, + input=inputs, + project_name=self._project_name, + ) + + self._map_call_id_to_trace_data[call_id] = trace_data + + span_data = span.SpanData( + trace_id=trace_data.id, + parent_span_id=None, + name=instance.__class__.__name__, + input=inputs, + project_name=self._project_name, + ) + + self._map_call_id_to_span_data[call_id] = span_data def on_module_end( self, @@ -50,16 +154,16 @@ def on_module_end( ): print(f"on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") - if self._opik_trace_data is None: - return - - if exception: - error_info = error_info_collector.collect(exception) - self._opik_trace_data.update(error_info=error_info) - - self._opik_trace_data.init_end_time() - self._opik_client.trace(**self._opik_trace_data.__dict__) - self._opik_trace_data = None + # if self._opik_trace_data is None: + # return + # + # if exception: + # error_info = error_info_collector.collect(exception) + # self._opik_trace_data.update(error_info=error_info) + # + # self._opik_trace_data.init_end_time() + # self._opik_client.trace(**self._opik_trace_data.__dict__) + # self._opik_trace_data = None def on_lm_start( self, @@ -69,15 +173,24 @@ def on_lm_start( ): print(f"LM is called with inputs: {inputs}") + current_span_data = self._map_call_id_to_span_data.get(call_id) + # assert current_span_data is not None + + # todo handle provider+model + # elif isinstance(instance, dspy.Predict): + # return SpanType.LLM + span_data = span.SpanData( - trace_id=self._opik_trace_data.id, + trace_id=current_span_data.trace_id, name=instance.__class__.__name__, - parent_span_id=None, + parent_span_id=current_span_data.id, type="llm", input=inputs, - project_name=self._opik_trace_data.project_name, + # project_name=self._opik_trace_data.project_name, + # provider="openai", + # model="gpt-3.5-turbo", ) - context_storage.add_span_data(span_data) + # context_storage.add_span_data(span_data) def on_lm_end( @@ -88,12 +201,12 @@ def on_lm_end( ): print(f"LM is finished with outputs: {outputs}") - span_data = context_storage.pop_span_data() - - if exception: - error_info = error_info_collector.collect(exception) - span_data.update(error_info=error_info) - - span_data.update(output={"output": outputs}).init_end_time() - - self._opik_client.span(**span_data.__dict__) + # span_data = context_storage.pop_span_data() + # + # if exception: + # error_info = error_info_collector.collect(exception) + # span_data.update(error_info=error_info) + # + # span_data.update(output={"output": outputs}).init_end_time() + # + # self._opik_client.span(**span_data.__dict__) From c43aa354abb39888c6e6658275c36f0fc0c6862f Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Mon, 23 Dec 2024 18:24:38 +0000 Subject: [PATCH 03/13] wip --- .../src/opik/integrations/dspy/callback.py | 101 ++++++++++-------- 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index 210247e4e2..d7ef0346fd 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -1,33 +1,38 @@ -from contextvars import ContextVar -from typing import Any, Dict, Optional, Set +from contextvars import ContextVar, Token +from typing import Any, Dict, Optional from dspy.utils.callback import BaseCallback -from opik import context_storage, opik_context +from opik import opik_context from opik.api_objects import helpers, opik_client, span, trace from opik.decorator import error_info_collector - -_OPIK_CONTEXT = ContextVar("opik_context", default=None) +_OPIK_CONTEXT: ContextVar[Optional[span.SpanData]] = ContextVar("opik_context", default=None) class OpikCallback(BaseCallback): def __init__( self, + client: Optional[opik_client.Opik] = None, project_name: Optional[str] = None, ): self._map_call_id_to_span_data: Dict[str, span.SpanData] = {} self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} - self._externally_created_traces_ids: Set[str] = set() + + self._map_span_id_to_token: Dict[str, Token] = {} self._current_context: ContextVar[Optional[span.SpanData]] = _OPIK_CONTEXT self._project_name = project_name - self._opik_client = opik_client.Opik( - _use_batching=True, - project_name=project_name, - ) + + if client: + self._opik_client = client + else: + self._opik_client = opik_client.Opik( + _use_batching=True, + project_name=project_name, + ) def on_module_start( self, @@ -47,7 +52,9 @@ def on_module_start( return if current_span_data := opik_context.get_current_span_data(): - self._current_context.set(current_span_data) + token = self._current_context.set(current_span_data) + self._map_span_id_to_token[current_span_data.id] = token + self._attach_span_to_existing_span( call_id=call_id, current_span_data=current_span_data, @@ -71,12 +78,6 @@ def on_module_start( inputs=inputs, ) - # trace_data = trace.TraceData( - # name=instance.__class__.__name__, - # metadata={"created_from": "dspy"}, - # project_name=self._project_name, - # ) - def _attach_span_to_existing_span( self, call_id: str, @@ -97,7 +98,6 @@ def _attach_span_to_existing_span( project_name=project_name, ) self._map_call_id_to_span_data[call_id] = span_data - self._externally_created_traces_ids.add(span_data.trace_id) def _attach_span_to_existing_trace( self, @@ -118,9 +118,10 @@ def _attach_span_to_existing_trace( input=inputs, project_name=project_name, ) - self._current_context.set(span_data) + token = self._current_context.set(span_data) + self._map_span_id_to_token[span_data.id] = token + self._map_call_id_to_span_data[call_id] = span_data - self._externally_created_traces_ids.add(current_trace_data.id) def _initialize_span_and_trace_from_scratch( self, @@ -131,6 +132,7 @@ def _initialize_span_and_trace_from_scratch( trace_data = trace.TraceData( name=instance.__class__.__name__, input=inputs, + metadata={"created_from": "dspy"}, project_name=self._project_name, ) @@ -143,6 +145,8 @@ def _initialize_span_and_trace_from_scratch( input=inputs, project_name=self._project_name, ) + token = self._current_context.set(span_data) + self._map_span_id_to_token[span_data.id] = token self._map_call_id_to_span_data[call_id] = span_data @@ -154,16 +158,23 @@ def on_module_end( ): print(f"on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") - # if self._opik_trace_data is None: - # return - # - # if exception: - # error_info = error_info_collector.collect(exception) - # self._opik_trace_data.update(error_info=error_info) - # - # self._opik_trace_data.init_end_time() - # self._opik_client.trace(**self._opik_trace_data.__dict__) - # self._opik_trace_data = None + span_data = self._map_call_id_to_span_data.pop(call_id) + trace_data = self._map_call_id_to_trace_data.pop(call_id, None) + + if exception: + error_info = error_info_collector.collect(exception) + span_data.update(error_info=error_info) + + span_data.update(output={"output": outputs}).init_end_time() + self._opik_client.span(**span_data.__dict__) + + # remove span data from context + if token := self._map_span_id_to_token.pop(span_data.id, None): + self._current_context.reset(token) + + if trace_data is not None: + self._opik_client.trace(**trace_data.__dict__) + def on_lm_start( self, @@ -173,24 +184,24 @@ def on_lm_start( ): print(f"LM is called with inputs: {inputs}") - current_span_data = self._map_call_id_to_span_data.get(call_id) - # assert current_span_data is not None + current_context = self._current_context.get() + assert current_context is not None # todo handle provider+model # elif isinstance(instance, dspy.Predict): # return SpanType.LLM span_data = span.SpanData( - trace_id=current_span_data.trace_id, + trace_id=current_context.trace_id, name=instance.__class__.__name__, - parent_span_id=current_span_data.id, + parent_span_id=current_context.id, type="llm", input=inputs, - # project_name=self._opik_trace_data.project_name, + project_name=current_context.project_name, # provider="openai", # model="gpt-3.5-turbo", ) - # context_storage.add_span_data(span_data) + self._map_call_id_to_span_data[call_id] = span_data def on_lm_end( @@ -201,12 +212,12 @@ def on_lm_end( ): print(f"LM is finished with outputs: {outputs}") - # span_data = context_storage.pop_span_data() - # - # if exception: - # error_info = error_info_collector.collect(exception) - # span_data.update(error_info=error_info) - # - # span_data.update(output={"output": outputs}).init_end_time() - # - # self._opik_client.span(**span_data.__dict__) + span_data = self._map_call_id_to_span_data.pop(call_id) + + if exception: + error_info = error_info_collector.collect(exception) + span_data.update(error_info=error_info) + + span_data.update(output={"output": outputs}).init_end_time() + + self._opik_client.span(**span_data.__dict__) From b39fa322f7e875be0c047a897d449462a232f85f Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Mon, 23 Dec 2024 19:41:07 +0000 Subject: [PATCH 04/13] wip --- sdks/python/src/opik/integrations/dspy/callback.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index d7ef0346fd..cb8df07a8b 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -173,6 +173,7 @@ def on_module_end( self._current_context.reset(token) if trace_data is not None: + trace_data.init_end_time() self._opik_client.trace(**trace_data.__dict__) From 489769c2c4d42462f5c1ad7a6bb44c6e0010f895 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 12:25:15 +0000 Subject: [PATCH 05/13] wip --- .../src/opik/integrations/dspy/callback.py | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index cb8df07a8b..e44b498152 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -7,32 +7,25 @@ from opik.api_objects import helpers, opik_client, span, trace from opik.decorator import error_info_collector -_OPIK_CONTEXT: ContextVar[Optional[span.SpanData]] = ContextVar("opik_context", default=None) - class OpikCallback(BaseCallback): def __init__( self, - client: Optional[opik_client.Opik] = None, project_name: Optional[str] = None, ): self._map_call_id_to_span_data: Dict[str, span.SpanData] = {} self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} - self._map_span_id_to_token: Dict[str, Token] = {} - self._current_context: ContextVar[Optional[span.SpanData]] = _OPIK_CONTEXT + self._current_callback_context: ContextVar[Optional[span.SpanData]] = ContextVar("opik_context", default=None) self._project_name = project_name - if client: - self._opik_client = client - else: - self._opik_client = opik_client.Opik( - _use_batching=True, - project_name=project_name, - ) + self._opik_client = opik_client.Opik( + project_name=project_name, + _use_batching=True, + ) def on_module_start( self, @@ -40,27 +33,28 @@ def on_module_start( instance: Any, inputs: Dict[str, Any], ) -> None: - print(f"on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}, inputs: {inputs}") + print( + f"on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}") - if current_context := self._current_context.get(): + if current_callback_context_data := self._current_callback_context.get(): self._attach_span_to_existing_span( call_id=call_id, - current_span_data=current_context, + current_span_data=current_callback_context_data, instance=instance, inputs=inputs, ) return if current_span_data := opik_context.get_current_span_data(): - token = self._current_context.set(current_span_data) - self._map_span_id_to_token[current_span_data.id] = token - self._attach_span_to_existing_span( call_id=call_id, current_span_data=current_span_data, instance=instance, inputs=inputs, ) + new_span_data = self._map_call_id_to_span_data[call_id] + self._callback_context_set(new_span_data) + return if current_trace_data := opik_context.get_current_trace_data(): @@ -118,9 +112,7 @@ def _attach_span_to_existing_trace( input=inputs, project_name=project_name, ) - token = self._current_context.set(span_data) - self._map_span_id_to_token[span_data.id] = token - + self._callback_context_set(span_data) self._map_call_id_to_span_data[call_id] = span_data def _initialize_span_and_trace_from_scratch( @@ -135,7 +127,6 @@ def _initialize_span_and_trace_from_scratch( metadata={"created_from": "dspy"}, project_name=self._project_name, ) - self._map_call_id_to_trace_data[call_id] = trace_data span_data = span.SpanData( @@ -145,9 +136,7 @@ def _initialize_span_and_trace_from_scratch( input=inputs, project_name=self._project_name, ) - token = self._current_context.set(span_data) - self._map_span_id_to_token[span_data.id] = token - + self._callback_context_set(span_data) self._map_call_id_to_span_data[call_id] = span_data def on_module_end( @@ -170,13 +159,12 @@ def on_module_end( # remove span data from context if token := self._map_span_id_to_token.pop(span_data.id, None): - self._current_context.reset(token) + self._current_callback_context.reset(token) if trace_data is not None: trace_data.init_end_time() self._opik_client.trace(**trace_data.__dict__) - def on_lm_start( self, call_id: str, @@ -185,7 +173,7 @@ def on_lm_start( ): print(f"LM is called with inputs: {inputs}") - current_context = self._current_context.get() + current_context = self._current_callback_context.get() assert current_context is not None # todo handle provider+model @@ -204,7 +192,6 @@ def on_lm_start( ) self._map_call_id_to_span_data[call_id] = span_data - def on_lm_end( self, call_id: str, @@ -222,3 +209,10 @@ def on_lm_end( span_data.update(output={"output": outputs}).init_end_time() self._opik_client.span(**span_data.__dict__) + + def flush(self) -> None: + self._opik_client.flush() + + def _callback_context_set(self, value: span.SpanData) -> None: + token = self._current_callback_context.set(value) + self._map_span_id_to_token[value.id] = token From 195629494537efd8979b85cd58bb1a91ccdb4cf0 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 14:09:06 +0000 Subject: [PATCH 06/13] new UI representation --- .../src/opik/integrations/dspy/callback.py | 134 +++++++++++------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index e44b498152..e08e519982 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -1,12 +1,15 @@ from contextvars import ContextVar, Token -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from dspy.utils.callback import BaseCallback from opik import opik_context -from opik.api_objects import helpers, opik_client, span, trace +from opik.api_objects import helpers, span, trace +from opik.api_objects.opik_client import get_client_cached from opik.decorator import error_info_collector +ContextType = Union[span.SpanData, trace.TraceData] + class OpikCallback(BaseCallback): @@ -16,16 +19,13 @@ def __init__( ): self._map_call_id_to_span_data: Dict[str, span.SpanData] = {} self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} - self._map_span_id_to_token: Dict[str, Token] = {} + self._map_span_id_or_trace_id_to_token: Dict[str, Token] = {} - self._current_callback_context: ContextVar[Optional[span.SpanData]] = ContextVar("opik_context", default=None) + self._current_callback_context: ContextVar[Optional[ContextType]] = ContextVar("opik_context", default=None) self._project_name = project_name - self._opik_client = opik_client.Opik( - project_name=project_name, - _use_batching=True, - ) + self._opik_client = get_client_cached() def on_module_start( self, @@ -33,16 +33,23 @@ def on_module_start( instance: Any, inputs: Dict[str, Any], ) -> None: - print( - f"on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}") + print(f"*** on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}") if current_callback_context_data := self._current_callback_context.get(): - self._attach_span_to_existing_span( - call_id=call_id, - current_span_data=current_callback_context_data, - instance=instance, - inputs=inputs, - ) + if isinstance(current_callback_context_data, span.SpanData): + self._attach_span_to_existing_span( + call_id=call_id, + current_span_data=current_callback_context_data, + instance=instance, + inputs=inputs, + ) + else: + self._attach_span_to_existing_trace( + call_id=call_id, + current_trace_data=current_callback_context_data, + instance=instance, + inputs=inputs, + ) return if current_span_data := opik_context.get_current_span_data(): @@ -54,7 +61,6 @@ def on_module_start( ) new_span_data = self._map_call_id_to_span_data[call_id] self._callback_context_set(new_span_data) - return if current_trace_data := opik_context.get_current_trace_data(): @@ -64,9 +70,11 @@ def on_module_start( instance=instance, inputs=inputs, ) + new_trace_data = self._map_call_id_to_trace_data[call_id] + self._callback_context_set(new_trace_data) return - self._initialize_span_and_trace_from_scratch( + self._start_trace( call_id=call_id, instance=instance, inputs=inputs, @@ -112,10 +120,9 @@ def _attach_span_to_existing_trace( input=inputs, project_name=project_name, ) - self._callback_context_set(span_data) self._map_call_id_to_span_data[call_id] = span_data - def _initialize_span_and_trace_from_scratch( + def _start_trace( self, call_id: str, instance: Any, @@ -128,16 +135,7 @@ def _initialize_span_and_trace_from_scratch( project_name=self._project_name, ) self._map_call_id_to_trace_data[call_id] = trace_data - - span_data = span.SpanData( - trace_id=trace_data.id, - parent_span_id=None, - name=instance.__class__.__name__, - input=inputs, - project_name=self._project_name, - ) - self._callback_context_set(span_data) - self._map_call_id_to_span_data[call_id] = span_data + self._callback_context_set(trace_data) def on_module_end( self, @@ -145,10 +143,31 @@ def on_module_end( outputs: Optional[Any], exception: Optional[Exception] = None, ): - print(f"on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") + print(f"*** on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") + + self._end_span( + call_id=call_id, + exception=exception, + outputs=outputs, + ) + self._end_trace(call_id=call_id) + + def _end_trace(self, call_id: str) -> None: + if trace_data := self._map_call_id_to_trace_data.pop(call_id, None): + trace_data.init_end_time() + self._opik_client.trace(**trace_data.__dict__) + + # remove trace data from context + if token := self._map_span_id_or_trace_id_to_token.pop(trace_data.id, None): + self._current_callback_context.reset(token) + def _end_span( + self, + call_id: str, + outputs: Optional[Any], + exception: Optional[Exception] = None, + ) -> None: span_data = self._map_call_id_to_span_data.pop(call_id) - trace_data = self._map_call_id_to_trace_data.pop(call_id, None) if exception: error_info = error_info_collector.collect(exception) @@ -158,35 +177,43 @@ def on_module_end( self._opik_client.span(**span_data.__dict__) # remove span data from context - if token := self._map_span_id_to_token.pop(span_data.id, None): + if token := self._map_span_id_or_trace_id_to_token.pop(span_data.id, None): self._current_callback_context.reset(token) - if trace_data is not None: - trace_data.init_end_time() - self._opik_client.trace(**trace_data.__dict__) - def on_lm_start( self, call_id: str, instance: Any, inputs: Dict[str, Any], ): - print(f"LM is called with inputs: {inputs}") + print(f"*** LM is called with inputs: {inputs}") + + current_callback_context_data = self._current_callback_context.get() + assert current_callback_context_data is not None + + project_name = helpers.resolve_child_span_project_name( + current_callback_context_data.project_name, + self._project_name, + ) - current_context = self._current_callback_context.get() - assert current_context is not None + if isinstance(current_callback_context_data, span.SpanData): + trace_id = current_callback_context_data.trace_id + parent_span_id = current_callback_context_data.id + else: + trace_id = current_callback_context_data.id + parent_span_id = None # todo handle provider+model # elif isinstance(instance, dspy.Predict): # return SpanType.LLM span_data = span.SpanData( - trace_id=current_context.trace_id, + trace_id=trace_id, name=instance.__class__.__name__, - parent_span_id=current_context.id, + parent_span_id=parent_span_id, type="llm", input=inputs, - project_name=current_context.project_name, + project_name=project_name, # provider="openai", # model="gpt-3.5-turbo", ) @@ -198,21 +225,18 @@ def on_lm_end( outputs: Optional[Dict[str, Any]], exception: Optional[Exception] = None, ): - print(f"LM is finished with outputs: {outputs}") - - span_data = self._map_call_id_to_span_data.pop(call_id) - - if exception: - error_info = error_info_collector.collect(exception) - span_data.update(error_info=error_info) - - span_data.update(output={"output": outputs}).init_end_time() + print(f"*** LM is finished with outputs: {outputs}") - self._opik_client.span(**span_data.__dict__) + self._end_span( + call_id=call_id, + exception=exception, + outputs=outputs, + ) + self._end_trace(call_id=call_id) def flush(self) -> None: self._opik_client.flush() - def _callback_context_set(self, value: span.SpanData) -> None: + def _callback_context_set(self, value: ContextType) -> None: token = self._current_callback_context.set(value) - self._map_span_id_to_token[value.id] = token + self._map_span_id_or_trace_id_to_token[value.id] = token From a0e57b5658e6555f3787671c9bb8c53db4116d80 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 14:43:17 +0000 Subject: [PATCH 07/13] new UI representation fix --- .../src/opik/integrations/dspy/callback.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index e08e519982..cb61fc5dab 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -167,18 +167,17 @@ def _end_span( outputs: Optional[Any], exception: Optional[Exception] = None, ) -> None: - span_data = self._map_call_id_to_span_data.pop(call_id) + if span_data := self._map_call_id_to_span_data.pop(call_id, None): + if exception: + error_info = error_info_collector.collect(exception) + span_data.update(error_info=error_info) - if exception: - error_info = error_info_collector.collect(exception) - span_data.update(error_info=error_info) + span_data.update(output={"output": outputs}).init_end_time() + self._opik_client.span(**span_data.__dict__) - span_data.update(output={"output": outputs}).init_end_time() - self._opik_client.span(**span_data.__dict__) - - # remove span data from context - if token := self._map_span_id_or_trace_id_to_token.pop(span_data.id, None): - self._current_callback_context.reset(token) + # remove span data from context + if token := self._map_span_id_or_trace_id_to_token.pop(span_data.id, None): + self._current_callback_context.reset(token) def on_lm_start( self, From 39d55f75974238c92289f11c9b21d938b8522627 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 15:08:11 +0000 Subject: [PATCH 08/13] detect provider+model --- sdks/python/src/opik/integrations/dspy/callback.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index cb61fc5dab..0d8ed00aea 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -202,9 +202,8 @@ def on_lm_start( trace_id = current_callback_context_data.id parent_span_id = None - # todo handle provider+model - # elif isinstance(instance, dspy.Predict): - # return SpanType.LLM + provider, model = instance.model.split(r"/", 1) + span_type = self._get_span_type(instance) span_data = span.SpanData( trace_id=trace_id, @@ -213,8 +212,8 @@ def on_lm_start( type="llm", input=inputs, project_name=project_name, - # provider="openai", - # model="gpt-3.5-turbo", + provider=provider, + model=model, ) self._map_call_id_to_span_data[call_id] = span_data From a2edb9bba728662375d0b918f6a225bf7638c6c8 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 15:09:05 +0000 Subject: [PATCH 09/13] detect span type --- sdks/python/src/opik/integrations/dspy/callback.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index 0d8ed00aea..0233f16963 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -1,6 +1,7 @@ from contextvars import ContextVar, Token from typing import Any, Dict, Optional, Union +import dspy from dspy.utils.callback import BaseCallback from opik import opik_context @@ -91,12 +92,14 @@ def _attach_span_to_existing_span( parent_project_name=current_span_data.project_name, child_project_name=self._project_name, ) + span_type = self._get_span_type(instance) span_data = span.SpanData( trace_id=current_span_data.trace_id, parent_span_id=current_span_data.id, name=instance.__class__.__name__, input=inputs, + type=span_type, project_name=project_name, ) self._map_call_id_to_span_data[call_id] = span_data @@ -112,12 +115,14 @@ def _attach_span_to_existing_trace( current_trace_data.project_name, self._project_name, ) + span_type = self._get_span_type(instance) span_data = span.SpanData( trace_id=current_trace_data.id, parent_span_id=None, name=instance.__class__.__name__, input=inputs, + type=span_type, project_name=project_name, ) self._map_call_id_to_span_data[call_id] = span_data @@ -209,7 +214,7 @@ def on_lm_start( trace_id=trace_id, name=instance.__class__.__name__, parent_span_id=parent_span_id, - type="llm", + type=span_type, input=inputs, project_name=project_name, provider=provider, @@ -238,3 +243,10 @@ def flush(self) -> None: def _callback_context_set(self, value: ContextType) -> None: token = self._current_callback_context.set(value) self._map_span_id_or_trace_id_to_token[value.id] = token + + def _get_span_type(self, instance: Any) -> span.SpanType: + if isinstance(instance, dspy.Predict): + return "llm" + elif isinstance(instance, dspy.LM): + return "llm" + return "general" From 965ee7b385d15cfdee92c8e27ca5e215bcb20bed Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 15:13:45 +0000 Subject: [PATCH 10/13] fix linter --- .../src/opik/integrations/dspy/callback.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index 0233f16963..6dbe07c290 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -13,7 +13,6 @@ class OpikCallback(BaseCallback): - def __init__( self, project_name: Optional[str] = None, @@ -22,7 +21,9 @@ def __init__( self._map_call_id_to_trace_data: Dict[str, trace.TraceData] = {} self._map_span_id_or_trace_id_to_token: Dict[str, Token] = {} - self._current_callback_context: ContextVar[Optional[ContextType]] = ContextVar("opik_context", default=None) + self._current_callback_context: ContextVar[Optional[ContextType]] = ContextVar( + "opik_context", default=None + ) self._project_name = project_name @@ -34,8 +35,6 @@ def on_module_start( instance: Any, inputs: Dict[str, Any], ) -> None: - print(f"*** on_module_start() is called with call_id: {call_id}, instance: {instance.__class__.__name__}") - if current_callback_context_data := self._current_callback_context.get(): if isinstance(current_callback_context_data, span.SpanData): self._attach_span_to_existing_span( @@ -147,9 +146,7 @@ def on_module_end( call_id: str, outputs: Optional[Any], exception: Optional[Exception] = None, - ): - print(f"*** on_module_end() is called with call_id: {call_id}, outputs: {outputs}, exception: {exception}") - + ) -> None: self._end_span( call_id=call_id, exception=exception, @@ -189,9 +186,7 @@ def on_lm_start( call_id: str, instance: Any, inputs: Dict[str, Any], - ): - print(f"*** LM is called with inputs: {inputs}") - + ) -> None: current_callback_context_data = self._current_callback_context.get() assert current_callback_context_data is not None @@ -227,9 +222,7 @@ def on_lm_end( call_id: str, outputs: Optional[Dict[str, Any]], exception: Optional[Exception] = None, - ): - print(f"*** LM is finished with outputs: {outputs}") - + ) -> None: self._end_span( call_id=call_id, exception=exception, From 43b75293c566f97dff90b9d1163f0abb966b5d86 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Tue, 24 Dec 2024 16:58:52 +0000 Subject: [PATCH 11/13] add integration tests --- .github/workflows/lib-dspy-tests.yml | 51 ++ .../lib-integration-tests-runner.yml | 7 + .../src/opik/integrations/dspy/callback.py | 5 +- .../library_integration/dspy/__init__.py | 0 .../library_integration/dspy/requirements.txt | 1 + .../library_integration/dspy/test_dspy.py | 447 ++++++++++++++++++ 6 files changed, 509 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/lib-dspy-tests.yml create mode 100644 sdks/python/tests/library_integration/dspy/__init__.py create mode 100644 sdks/python/tests/library_integration/dspy/requirements.txt create mode 100644 sdks/python/tests/library_integration/dspy/test_dspy.py diff --git a/.github/workflows/lib-dspy-tests.yml b/.github/workflows/lib-dspy-tests.yml new file mode 100644 index 0000000000..1b6f6c548c --- /dev/null +++ b/.github/workflows/lib-dspy-tests.yml @@ -0,0 +1,51 @@ +# Workflow to run DSPy tests +# +# Please read inputs to provide correct values. +# +name: SDK Lib DSPy Tests +run-name: "SDK Lib DSPy Tests ${{ github.ref_name }} by @${{ github.actor }}" +env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }} +on: + workflow_call: + +jobs: + tests: + name: DSPy Python ${{matrix.python_version}} + runs-on: ubuntu-latest + defaults: + run: + working-directory: sdks/python + + strategy: + fail-fast: true + matrix: + python_version: ["3.9", "3.10", "3.11", "3.12"] + + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Setup Python ${{matrix.python_version}} + uses: actions/setup-python@v5 + with: + python-version: ${{matrix.python_version}} + + - name: Install opik + run: pip install . + + - name: Install test tools + run: | + cd ./tests + pip install --no-cache-dir --disable-pip-version-check -r test_requirements.txt + + - name: Install lib + run: | + cd ./tests + pip install --no-cache-dir --disable-pip-version-check -r library_integration/dspy/requirements.txt + + - name: Run tests + run: | + cd ./tests/library_integration/dspy/ + python -m pytest -vv . \ No newline at end of file diff --git a/.github/workflows/lib-integration-tests-runner.yml b/.github/workflows/lib-integration-tests-runner.yml index 8885dab222..4bf3d12b7e 100644 --- a/.github/workflows/lib-integration-tests-runner.yml +++ b/.github/workflows/lib-integration-tests-runner.yml @@ -18,6 +18,7 @@ on: - aisuite - haystack - guardrails + - dspy schedule: - cron: "0 0 */1 * *" pull_request: @@ -87,3 +88,9 @@ jobs: if: contains(fromJSON('["guardrails", "all"]'), needs.init_environment.outputs.LIBS) uses: ./.github/workflows/lib-guardrails-tests.yml secrets: inherit + + dspy_tests: + needs: [init_environment] + if: contains(fromJSON('["dspy", "all"]'), needs.init_environment.outputs.LIBS) + uses: ./.github/workflows/lib-dspy-tests.yml + secrets: inherit diff --git a/sdks/python/src/opik/integrations/dspy/callback.py b/sdks/python/src/opik/integrations/dspy/callback.py index 6dbe07c290..52b8fdc340 100644 --- a/sdks/python/src/opik/integrations/dspy/callback.py +++ b/sdks/python/src/opik/integrations/dspy/callback.py @@ -70,8 +70,8 @@ def on_module_start( instance=instance, inputs=inputs, ) - new_trace_data = self._map_call_id_to_trace_data[call_id] - self._callback_context_set(new_trace_data) + new_span_data = self._map_call_id_to_span_data[call_id] + self._callback_context_set(new_span_data) return self._start_trace( @@ -231,6 +231,7 @@ def on_lm_end( self._end_trace(call_id=call_id) def flush(self) -> None: + """Sends pending Opik data to the backend""" self._opik_client.flush() def _callback_context_set(self, value: ContextType) -> None: diff --git a/sdks/python/tests/library_integration/dspy/__init__.py b/sdks/python/tests/library_integration/dspy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/tests/library_integration/dspy/requirements.txt b/sdks/python/tests/library_integration/dspy/requirements.txt new file mode 100644 index 0000000000..65525d8942 --- /dev/null +++ b/sdks/python/tests/library_integration/dspy/requirements.txt @@ -0,0 +1 @@ +dspy diff --git a/sdks/python/tests/library_integration/dspy/test_dspy.py b/sdks/python/tests/library_integration/dspy/test_dspy.py new file mode 100644 index 0000000000..6c1b755b92 --- /dev/null +++ b/sdks/python/tests/library_integration/dspy/test_dspy.py @@ -0,0 +1,447 @@ +import dspy +import pytest + +import opik +from opik import context_storage +from opik.api_objects import opik_client, span, trace +from opik.config import OPIK_PROJECT_DEFAULT_NAME +from opik.integrations.dspy.callback import OpikCallback +from ...testlib import ( + ANY_BUT_NONE, + ANY_DICT, + ANY_STRING, + SpanModel, + TraceModel, + assert_equal, +) + + +@pytest.mark.parametrize( + "project_name, expected_project_name", + [ + (None, OPIK_PROJECT_DEFAULT_NAME), + ("dspy-integration-test", "dspy-integration-test"), + ], +) +def test_dspy__happyflow( + fake_backend, + project_name, + expected_project_name, +): + lm = dspy.LM( + model="openai/gpt-4o-mini", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="ChainOfThought", + input={"args": (), "kwargs": {"question": "What is the meaning of life?"}}, + output=None, + metadata={"created_from": "dspy"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-4o-mini", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[], + ), + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 2 + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy__openai_llm_is_used__error_occurred_during_openai_call__error_info_is_logged( + fake_backend, +): + lm = dspy.LM( + model="openai/gpt-3.5-turbo", + api_key="incorrect-api-key", + ) + dspy.configure(lm=lm) + + project_name = "dspy-integration-test" + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + + with pytest.raises(Exception): + cot(question="What is the meaning of life?") + + opik_callback.flush() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="ChainOfThought", + input={"args": (), "kwargs": {"question": "What is the meaning of life?"}}, + output=None, + metadata={"created_from": "dspy"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + error_info={ + "exception_type": ANY_STRING(), + "message": ANY_STRING(), + "traceback": ANY_STRING(), + }, + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + error_info={ + "exception_type": ANY_STRING(), + "message": ANY_STRING(), + "traceback": ANY_STRING(), + }, + ), + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 2 + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_inside_another_track_function__data_attached_to_existing_trace_tree( + fake_backend, +): + project_name = "dspy-integration-test" + + @opik.track(project_name=project_name, capture_output=True) + def f(x): + lm = dspy.LM( + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback(project_name=project_name) + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + return "the-output" + + f("the-input") + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="f", + input={"x": "the-input"}, + output={"output": "the-output"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name="f", + type="general", + input={"x": "the-input"}, + output={"output": "the-output"}, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=project_name, + spans=[], + ), + ], + ) + ], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 1 + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_when_there_was_already_existing_trace_without_span__data_attached_to_existing_trace( + fake_backend, +): + def f(): + lm = dspy.LM( + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback() + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + client = opik_client.get_client_cached() + + # Prepare context to have manually created trace data + trace_data = trace.TraceData( + name="manually-created-trace", + input={"input": "input-of-manually-created-trace"}, + ) + context_storage.set_trace_data(trace_data) + + f() + + # Send trace data + trace_data = context_storage.pop_trace_data() + trace_data.init_end_time().update( + output={"output": "output-of-manually-created-trace"} + ) + client.trace(**trace_data.__dict__) + + opik.flush_tracker() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_STRING(), + name="manually-created-trace", + input={"input": "input-of-manually-created-trace"}, + output={"output": "output-of-manually-created-trace"}, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + ], + ) + ], + ) + + assert len(fake_backend.trace_trees) == 1 + assert len(fake_backend.span_trees) == 1 + + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) + + +def test_dspy_callback__used_when_there_was_already_existing_span_without_trace__data_attached_to_existing_span( + fake_backend, +): + def f(): + lm = dspy.LM( + model="openai/gpt-3.5-turbo", + ) + dspy.configure(lm=lm) + + opik_callback = OpikCallback() + dspy.settings.configure(callbacks=[opik_callback]) + + cot = dspy.ChainOfThought("question -> answer") + cot(question="What is the meaning of life?") + + opik_callback.flush() + + client = opik_client.get_client_cached() + span_data = span.SpanData( + trace_id="some-trace-id", + name="manually-created-span", + input={"input": "input-of-manually-created-span"}, + ) + context_storage.add_span_data(span_data) + + f() + + span_data = context_storage.pop_span_data() + span_data.init_end_time().update( + output={"output": "output-of-manually-created-span"} + ) + client.span(**span_data.__dict__) + opik.flush_tracker() + + EXPECTED_SPANS_TREE = SpanModel( + id=ANY_STRING(), + name="manually-created-span", + input={"input": "input-of-manually-created-span"}, + output={"output": "output-of-manually-created-span"}, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[ + SpanModel( + id=ANY_STRING(), + name="ChainOfThought", + input={ + "args": (), + "kwargs": {"question": "What is the meaning of life?"}, + }, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=OPIK_PROJECT_DEFAULT_NAME, + spans=[ + SpanModel( + id=ANY_STRING(), + type="llm", + name="LM", + provider="openai", + model="gpt-3.5-turbo", + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + SpanModel( + id=ANY_STRING(), + type="llm", + name="Predict", + provider=None, + model=None, + input=ANY_DICT, + output=ANY_DICT, + metadata=None, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + spans=[], + ), + ], + ) + ], + ) + + assert len(fake_backend.span_trees) == 1 + assert_equal(EXPECTED_SPANS_TREE, fake_backend.span_trees[0]) From 9d6bc9219376b242f39703bc65003e72e83639a8 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Thu, 26 Dec 2024 14:57:03 +0000 Subject: [PATCH 12/13] handle changing spans order --- .../library_integration/dspy/test_dspy.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sdks/python/tests/library_integration/dspy/test_dspy.py b/sdks/python/tests/library_integration/dspy/test_dspy.py index 6c1b755b92..6d6cbb624a 100644 --- a/sdks/python/tests/library_integration/dspy/test_dspy.py +++ b/sdks/python/tests/library_integration/dspy/test_dspy.py @@ -1,3 +1,5 @@ +from typing import Union + import dspy import pytest @@ -16,6 +18,14 @@ ) +def sort_spans_by_name(tree: Union[SpanModel, TraceModel]) -> None: + """ + Sorts the spans within a trace/span tree by their names in ascending order. + """ + tree.spans = sorted(tree.spans, key=lambda span: span.name) + + + @pytest.mark.parametrize( "project_name, expected_project_name", [ @@ -84,6 +94,10 @@ def test_dspy__happyflow( assert len(fake_backend.trace_trees) == 1 assert len(fake_backend.span_trees) == 2 + + sort_spans_by_name(EXPECTED_TRACE_TREE) + sort_spans_by_name(fake_backend.trace_trees[0]) + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) @@ -160,6 +174,10 @@ def test_dspy__openai_llm_is_used__error_occurred_during_openai_call__error_info assert len(fake_backend.trace_trees) == 1 assert len(fake_backend.span_trees) == 2 + + sort_spans_by_name(EXPECTED_TRACE_TREE) + sort_spans_by_name(fake_backend.trace_trees[0]) + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) @@ -257,6 +275,10 @@ def f(x): assert len(fake_backend.trace_trees) == 1 assert len(fake_backend.span_trees) == 1 + + sort_spans_by_name(EXPECTED_TRACE_TREE.spans[0].spans[0]) + sort_spans_by_name(fake_backend.trace_trees[0].spans[0].spans[0]) + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) @@ -353,6 +375,9 @@ def f(): assert len(fake_backend.trace_trees) == 1 assert len(fake_backend.span_trees) == 1 + sort_spans_by_name(EXPECTED_TRACE_TREE.spans[0]) + sort_spans_by_name(fake_backend.trace_trees[0].spans[0]) + assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0]) @@ -444,4 +469,8 @@ def f(): ) assert len(fake_backend.span_trees) == 1 + + sort_spans_by_name(EXPECTED_SPANS_TREE.spans[0]) + sort_spans_by_name(fake_backend.span_trees[0].spans[0]) + assert_equal(EXPECTED_SPANS_TREE, fake_backend.span_trees[0]) From a5f1cd0da52718df3ce8d0dc34602268b01de382 Mon Sep 17 00:00:00 2001 From: Alexander Barannikov Date: Thu, 26 Dec 2024 15:22:33 +0000 Subject: [PATCH 13/13] handle api_key for dspy tests --- sdks/python/tests/library_integration/dspy/test_dspy.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/tests/library_integration/dspy/test_dspy.py b/sdks/python/tests/library_integration/dspy/test_dspy.py index 6d6cbb624a..02d0516bda 100644 --- a/sdks/python/tests/library_integration/dspy/test_dspy.py +++ b/sdks/python/tests/library_integration/dspy/test_dspy.py @@ -25,7 +25,6 @@ def sort_spans_by_name(tree: Union[SpanModel, TraceModel]) -> None: tree.spans = sorted(tree.spans, key=lambda span: span.name) - @pytest.mark.parametrize( "project_name, expected_project_name", [ @@ -39,6 +38,7 @@ def test_dspy__happyflow( expected_project_name, ): lm = dspy.LM( + cache=False, model="openai/gpt-4o-mini", ) dspy.configure(lm=lm) @@ -105,6 +105,7 @@ def test_dspy__openai_llm_is_used__error_occurred_during_openai_call__error_info fake_backend, ): lm = dspy.LM( + cache=False, model="openai/gpt-3.5-turbo", api_key="incorrect-api-key", ) @@ -189,6 +190,7 @@ def test_dspy_callback__used_inside_another_track_function__data_attached_to_exi @opik.track(project_name=project_name, capture_output=True) def f(x): lm = dspy.LM( + cache=False, model="openai/gpt-3.5-turbo", ) dspy.configure(lm=lm) @@ -287,6 +289,7 @@ def test_dspy_callback__used_when_there_was_already_existing_trace_without_span_ ): def f(): lm = dspy.LM( + cache=False, model="openai/gpt-3.5-turbo", ) dspy.configure(lm=lm) @@ -386,6 +389,7 @@ def test_dspy_callback__used_when_there_was_already_existing_span_without_trace_ ): def f(): lm = dspy.LM( + cache=False, model="openai/gpt-3.5-turbo", ) dspy.configure(lm=lm)