Skip to content

Commit

Permalink
feat(py): Add multipart feedback ingestion (#1129)
Browse files Browse the repository at this point in the history
When multipart is enabled, use same tracing_queue and multipart endpoint
to send feedback as well.
  • Loading branch information
akira authored Oct 29, 2024
2 parents a0f99d9 + 30d402a commit 6dcd332
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 81 deletions.
5 changes: 4 additions & 1 deletion python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ def _tracing_thread_handle_batch(
) -> None:
create = [it.item for it in batch if it.action == "create"]
update = [it.item for it in batch if it.action == "update"]
feedback = [it.item for it in batch if it.action == "feedback"]
try:
if use_multipart:
client.multipart_ingest_runs(create=create, update=update, pre_sampled=True)
client.multipart_ingest(
create=create, update=update, feedback=feedback, pre_sampled=True
)
else:
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
except Exception:
Expand Down
182 changes: 121 additions & 61 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,34 @@ def _run_transform(

return run_create

def _feedback_transform(
self,
feedback: Union[ls_schemas.Feedback, dict],
) -> dict:
"""Transform the given feedback object into a dictionary representation.
Args:
feedback (Union[ls_schemas.Feedback, dict]): The feedback object to transform.
update (bool, optional): Whether the payload is for an "update" event.
copy (bool, optional): Whether to deepcopy feedback inputs/outputs.
attachments_collector (Optional[dict[str, ls_schemas.Attachments]]):
A dictionary to collect attachments. If not passed, attachments
will be dropped.
Returns:
dict: The transformed feedback object as a dictionary.
"""
if hasattr(feedback, "dict") and callable(getattr(feedback, "dict")):
feedback_create: dict = feedback.dict() # type: ignore
else:
feedback_create = cast(dict, feedback)
if "id" not in feedback_create:
feedback_create["id"] = uuid.uuid4()
elif isinstance(feedback_create["id"], str):
feedback_create["id"] = uuid.UUID(feedback_create["id"])

return feedback_create

@staticmethod
def _insert_runtime_env(runs: Sequence[dict]) -> None:
runtime_env = ls_env.get_runtime_environment()
Expand Down Expand Up @@ -1408,14 +1436,15 @@ def _post_batch_ingest_runs(self, body: bytes, *, _context: str):
except Exception:
logger.warning(f"Failed to batch ingest runs: {repr(e)}")

def multipart_ingest_runs(
def multipart_ingest(
self,
create: Optional[
Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]]
] = None,
update: Optional[
Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]]
] = None,
feedback: Optional[Sequence[Union[ls_schemas.Feedback, Dict]]] = None,
*,
pre_sampled: bool = False,
) -> None:
Expand All @@ -1442,7 +1471,7 @@ def multipart_ingest_runs(
- The run objects MUST contain the dotted_order and trace_id fields
to be accepted by the API.
"""
if not create and not update:
if not (create or update or feedback):
return
# transform and convert to dicts
all_attachments: Dict[str, ls_schemas.Attachments] = {}
Expand All @@ -1454,6 +1483,7 @@ def multipart_ingest_runs(
self._run_transform(run, update=True, attachments_collector=all_attachments)
for run in update or EMPTY_SEQ
]
feedback_dicts = [self._feedback_transform(f) for f in feedback or EMPTY_SEQ]
# require trace_id and dotted_order
if create_dicts:
for run in create_dicts:
Expand Down Expand Up @@ -1491,21 +1521,26 @@ def multipart_ingest_runs(
if not pre_sampled:
create_dicts = self._filter_for_sampling(create_dicts)
update_dicts = self._filter_for_sampling(update_dicts, patch=True)
if not create_dicts and not update_dicts:
if not create_dicts and not update_dicts and not feedback_dicts:
return
# insert runtime environment
self._insert_runtime_env(create_dicts)
self._insert_runtime_env(update_dicts)
# send the runs in multipart requests
acc_context: List[str] = []
acc_parts: MultipartParts = []
for event, payloads in (("post", create_dicts), ("patch", update_dicts)):
for event, payloads in (
("post", create_dicts),
("patch", update_dicts),
("feedback", feedback_dicts),
):
for payload in payloads:
# collect fields to be sent as separate parts
fields = [
("inputs", payload.pop("inputs", None)),
("outputs", payload.pop("outputs", None)),
("events", payload.pop("events", None)),
("feedback", payload.pop("feedback", None)),
]
# encode the main run payload
payloadb = _dumps_json(payload)
Expand Down Expand Up @@ -4115,6 +4150,7 @@ def _submit_feedback(**kwargs):
),
feedback_source_type=ls_schemas.FeedbackSourceType.MODEL,
project_id=project_id,
trace_id=run.trace_id if run else None,
)
return results

Expand Down Expand Up @@ -4185,6 +4221,7 @@ def create_feedback(
project_id: Optional[ID_TYPE] = None,
comparative_experiment_id: Optional[ID_TYPE] = None,
feedback_group_id: Optional[ID_TYPE] = None,
trace_id: Optional[ID_TYPE] = None,
**kwargs: Any,
) -> ls_schemas.Feedback:
"""Create a feedback in the LangSmith API.
Expand All @@ -4194,6 +4231,8 @@ def create_feedback(
run_id : str or UUID
The ID of the run to provide feedback for. Either the run_id OR
the project_id must be provided.
trace_id : str or UUID
The trace ID of the run to provide feedback for. This is optional.
key : str
The name of the metric or 'aspect' this feedback is about.
score : float or int or bool or None, default=None
Expand Down Expand Up @@ -4241,66 +4280,87 @@ def create_feedback(
f" endpoint: {sorted(kwargs)}",
DeprecationWarning,
)
if not isinstance(feedback_source_type, ls_schemas.FeedbackSourceType):
feedback_source_type = ls_schemas.FeedbackSourceType(feedback_source_type)
if feedback_source_type == ls_schemas.FeedbackSourceType.API:
feedback_source: ls_schemas.FeedbackSourceBase = (
ls_schemas.APIFeedbackSource(metadata=source_info)
try:
if not isinstance(feedback_source_type, ls_schemas.FeedbackSourceType):
feedback_source_type = ls_schemas.FeedbackSourceType(
feedback_source_type
)
if feedback_source_type == ls_schemas.FeedbackSourceType.API:
feedback_source: ls_schemas.FeedbackSourceBase = (
ls_schemas.APIFeedbackSource(metadata=source_info)
)
elif feedback_source_type == ls_schemas.FeedbackSourceType.MODEL:
feedback_source = ls_schemas.ModelFeedbackSource(metadata=source_info)
else:
raise ValueError(f"Unknown feedback source type {feedback_source_type}")
feedback_source.metadata = (
feedback_source.metadata if feedback_source.metadata is not None else {}
)
elif feedback_source_type == ls_schemas.FeedbackSourceType.MODEL:
feedback_source = ls_schemas.ModelFeedbackSource(metadata=source_info)
else:
raise ValueError(f"Unknown feedback source type {feedback_source_type}")
feedback_source.metadata = (
feedback_source.metadata if feedback_source.metadata is not None else {}
)
if source_run_id is not None and "__run" not in feedback_source.metadata:
feedback_source.metadata["__run"] = {"run_id": str(source_run_id)}
if feedback_source.metadata and "__run" in feedback_source.metadata:
# Validate that the linked run ID is a valid UUID
# Run info may be a base model or dict.
_run_meta: Union[dict, Any] = feedback_source.metadata["__run"]
if hasattr(_run_meta, "dict") and callable(_run_meta):
_run_meta = _run_meta.dict()
if "run_id" in _run_meta:
_run_meta["run_id"] = str(
_as_uuid(
feedback_source.metadata["__run"]["run_id"],
"feedback_source.metadata['__run']['run_id']",
if source_run_id is not None and "__run" not in feedback_source.metadata:
feedback_source.metadata["__run"] = {"run_id": str(source_run_id)}
if feedback_source.metadata and "__run" in feedback_source.metadata:
# Validate that the linked run ID is a valid UUID
# Run info may be a base model or dict.
_run_meta: Union[dict, Any] = feedback_source.metadata["__run"]
if hasattr(_run_meta, "dict") and callable(_run_meta):
_run_meta = _run_meta.dict()
if "run_id" in _run_meta:
_run_meta["run_id"] = str(
_as_uuid(
feedback_source.metadata["__run"]["run_id"],
"feedback_source.metadata['__run']['run_id']",
)
)
feedback_source.metadata["__run"] = _run_meta
feedback = ls_schemas.FeedbackCreate(
id=_ensure_uuid(feedback_id),
# If run_id is None, this is interpreted as session-level
# feedback.
run_id=_ensure_uuid(run_id, accept_null=True),
trace_id=_ensure_uuid(trace_id, accept_null=True),
key=key,
score=score,
value=value,
correction=correction,
comment=comment,
feedback_source=feedback_source,
created_at=datetime.datetime.now(datetime.timezone.utc),
modified_at=datetime.datetime.now(datetime.timezone.utc),
feedback_config=feedback_config,
session_id=_ensure_uuid(project_id, accept_null=True),
comparative_experiment_id=_ensure_uuid(
comparative_experiment_id, accept_null=True
),
feedback_group_id=_ensure_uuid(feedback_group_id, accept_null=True),
)

feedback_block = _dumps_json(feedback.dict(exclude_none=True))
use_multipart = (self.info.batch_ingest_config or {}).get(
"use_multipart_endpoint", False
)

if (
use_multipart
and self.tracing_queue is not None
and feedback.trace_id is not None
):
self.tracing_queue.put(
TracingQueueItem(str(feedback.id), "feedback", feedback)
)
feedback_source.metadata["__run"] = _run_meta
feedback = ls_schemas.FeedbackCreate(
id=_ensure_uuid(feedback_id),
# If run_id is None, this is interpreted as session-level
# feedback.
run_id=_ensure_uuid(run_id, accept_null=True),
key=key,
score=score,
value=value,
correction=correction,
comment=comment,
feedback_source=feedback_source,
created_at=datetime.datetime.now(datetime.timezone.utc),
modified_at=datetime.datetime.now(datetime.timezone.utc),
feedback_config=feedback_config,
session_id=_ensure_uuid(project_id, accept_null=True),
comparative_experiment_id=_ensure_uuid(
comparative_experiment_id, accept_null=True
),
feedback_group_id=_ensure_uuid(feedback_group_id, accept_null=True),
)
feedback_block = _dumps_json(feedback.dict(exclude_none=True))
self.request_with_retries(
"POST",
"/feedback",
request_kwargs={
"data": feedback_block,
},
stop_after_attempt=stop_after_attempt,
retry_on=(ls_utils.LangSmithNotFoundError,),
)
return ls_schemas.Feedback(**feedback.dict())
else:
self.request_with_retries(
"POST",
"/feedback",
request_kwargs={
"data": feedback_block,
},
stop_after_attempt=stop_after_attempt,
retry_on=(ls_utils.LangSmithNotFoundError,),
)
return ls_schemas.Feedback(**feedback.dict())
except Exception as e:
logger.error("Error creating feedback", exc_info=True)
raise e

def update_feedback(
self,
Expand Down
2 changes: 2 additions & 0 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ class FeedbackBase(BaseModel):
"""The time the feedback was last modified."""
run_id: Optional[UUID]
"""The associated run ID this feedback is logged for."""
trace_id: Optional[UUID]
"""The associated trace ID this feedback is logged for."""
key: str
"""The metric name, tag, or aspect to provide feedback on."""
score: SCORE_TYPE = None
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langsmith"
version = "0.1.137"
version = "0.1.138rc1"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
authors = ["LangChain <[email protected]>"]
license = "MIT"
Expand Down
Loading

0 comments on commit 6dcd332

Please sign in to comment.