Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh batch score v2 data schema to align with OpenAI public API #2768

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,3 @@ mlruns/

# ignore config files
config.json
out*
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ $schema: http://azureml/sdk-2-0/ParallelComponent.json
type: parallel

name: batch_score_llm
version: 1.1.5
version: 1.1.6
display_name: Batch Score Large Language Models
is_deterministic: False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def handle_response(
if response_status == 200:
return self._create_scoring_result(
status=ScoringResultStatus.SUCCESS,
model_response_code=response_status,
scoring_request=scoring_request,
start=start,
end=end,
Expand All @@ -78,6 +79,7 @@ def handle_response(

result = self._create_scoring_result(
status=ScoringResultStatus.FAILURE,
model_response_code=response_status,
scoring_request=scoring_request,
start=start,
end=end,
Expand Down Expand Up @@ -130,6 +132,7 @@ def _handle_exception(
except Exception:
return self._create_scoring_result(
status=ScoringResultStatus.FAILURE,
model_response_code=http_response.status,
scoring_request=scoring_request,
start=start,
end=end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import asyncio

from ...utils.common import convert_result_list
from ...utils.output_formatter import OutputFormatter
from ...utils.v1_output_formatter import V1OutputFormatter
from ...utils.v2_output_formatter import V2OutputFormatter
from ..configuration.configuration import Configuration
from ..post_processing.mini_batch_context import MiniBatchContext
from ..post_processing.result_utils import apply_input_transformer
Expand Down Expand Up @@ -48,7 +50,12 @@ def run(self, payloads: "list[str]", mini_batch_context: MiniBatchContext = None

apply_input_transformer(self.__input_to_output_transformer, scoring_results)

results = convert_result_list(
output_formatter: OutputFormatter
if self._configuration.input_schema_version == 1:
output_formatter = V1OutputFormatter()
else:
output_formatter = V2OutputFormatter()
results = output_formatter.format_output(
results=scoring_results,
batch_size_per_request=self._configuration.batch_size_per_request)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import traceback

from ...utils.common import convert_result_list
from ...utils.output_formatter import OutputFormatter
from ...utils.v1_output_formatter import V1OutputFormatter
from ...utils.v2_output_formatter import V2OutputFormatter
from ..configuration.configuration import Configuration
from ..scoring.scoring_result import ScoringResult
from ..telemetry import logging_utils as lu
Expand All @@ -16,7 +18,7 @@
apply_input_transformer,
get_return_value,
)
from .output_handler import SingleFileOutputHandler, SeparateFileOutputHandler
from .output_handler import OutputHandler


def add_callback(callback, cur):
Expand All @@ -33,9 +35,11 @@ class CallbackFactory:

def __init__(self,
configuration: Configuration,
output_handler: OutputHandler,
input_to_output_transformer):
"""Initialize CallbackFactory."""
self._configuration = configuration
self._output_handler = output_handler
self.__input_to_output_transformer = input_to_output_transformer

def generate_callback(self):
Expand All @@ -46,7 +50,12 @@ def generate_callback(self):
return callback

def _convert_result_list(self, scoring_results: "list[ScoringResult]", mini_batch_context: MiniBatchContext):
return convert_result_list(
output_formatter: OutputFormatter
if self._configuration.input_schema_version == 1:
output_formatter = V1OutputFormatter()
else:
output_formatter = V2OutputFormatter()
return output_formatter.format_output(
results=scoring_results,
batch_size_per_request=self._configuration.batch_size_per_request)

Expand All @@ -64,13 +73,7 @@ def _save_mini_batch_result_and_emit(
if mini_batch_context.exception is None:
if self._configuration.save_mini_batch_results == "enabled":
lu.get_logger().info("save_mini_batch_results is enabled")
if (self._configuration.split_output):
output_handler = SeparateFileOutputHandler()
lu.get_logger().info("Saving successful results and errors to separate files")
else:
output_handler = SingleFileOutputHandler()
lu.get_logger().info("Saving results to single file")
output_handler.save_mini_batch_results(
self._output_handler.save_mini_batch_results(
scoring_results,
self._configuration.mini_batch_results_out_directory,
mini_batch_context.raw_mini_batch_context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""This file contains the data class for Azure OpenAI scoring error."""

from dataclasses import dataclass


@dataclass
class AoaiScoringError:
"""Azure OpenAI scoring error."""

code: str = None
message: str = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""This file contains the data class for Azure OpenAI scoring response."""

from dataclasses import dataclass


@dataclass
class AoaiScoringResponse:
"""Azure OpenAI scoring response."""

body: any = None
request_id: str = None
status_code: int = None
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def handle_response(
def _create_scoring_result(
self,
status: ScoringResultStatus,
model_response_code: int,
scoring_request: ScoringRequest,
start: float,
end: float,
Expand All @@ -43,6 +44,7 @@ def _create_scoring_result(
status=status,
start=start,
end=end,
model_response_code=model_response_code,
request_obj=scoring_request.original_payload_obj,
request_metadata=scoring_request.request_metadata,
response_body=http_post_response.payload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ScoringRequest:

__BATCH_REQUEST_METADATA = "_batch_request_metadata"
__REQUEST_METADATA = "request_metadata"
__CUSTOM_ID = "custom_id"

def __init__(
self,
Expand Down Expand Up @@ -55,6 +56,8 @@ def __init__(
# These properties do not need to be sent to the model & will be added to the output file directly
self.__request_metadata = self.__cleaned_payload_obj.pop(self.__BATCH_REQUEST_METADATA, None)
self.__request_metadata = self.__cleaned_payload_obj.pop(self.__REQUEST_METADATA, self.__request_metadata)
# If custom_id exists (V2 input schema), make sure it is not sent to MIR endpoint
self.__CUSTOM_ID = self.__cleaned_payload_obj.pop(self.__CUSTOM_ID, None)

self.__cleaned_payload = json.dumps(self.__cleaned_payload_obj, cls=BatchComponentJSONEncoder)
self.__loggable_payload = json.dumps(self.__loggable_payload_obj, cls=BatchComponentJSONEncoder)
Expand Down Expand Up @@ -136,6 +139,12 @@ def segment_id(self) -> int:
"""Get the segment id."""
return self.__segment_id

# read-only
@property
def custom_id(self) -> str:
"""Get the custom id. Only valid for V2 input schema."""
return self.__CUSTOM_ID

@estimated_cost.setter
def estimated_cost(self, cost: int):
"""Set the estimated cost."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
status: ScoringResultStatus,
start: float,
end: float,
model_response_code: int,
request_obj: any,
request_metadata: any,
response_body: any,
Expand All @@ -70,6 +71,7 @@ def __init__(
self.status = status
self.start = start
self.end = end
self.model_response_code = model_response_code
self.request_obj = request_obj # Normalize to json
self.request_metadata = request_metadata
self.response_body = response_body
Expand Down Expand Up @@ -121,6 +123,7 @@ def Failed(scoring_request: ScoringRequest = None) -> 'ScoringResult':
status=ScoringResultStatus.FAILURE,
start=0,
end=0,
model_response_code=None,
request_obj=scoring_request.original_payload_obj if scoring_request else None,
request_metadata=scoring_request.request_metadata if scoring_request else None,
response_body=None,
Expand All @@ -140,6 +143,7 @@ def copy(self) -> 'ScoringResult':
self.status,
self.start,
self.end,
self.model_response_code,
self.request_obj,
self.request_metadata,
deepcopy(self.response_body),
Expand Down
16 changes: 9 additions & 7 deletions assets/batch_score/components/driver/src/batch_score/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def init():
global par
global configuration
global input_handler
global output_handler

start = time.time()
parser = ConfigurationParserFactory().get_parser()
Expand All @@ -103,6 +104,13 @@ def init():
else:
raise ValueError(f"Invalid input_schema_version: {configuration.input_schema_version}")

if (configuration.split_output):
output_handler = SeparateFileOutputHandler()
lu.get_logger().info("Will save successful results and errors to separate files")
else:
output_handler = SingleFileOutputHandler()
lu.get_logger().info("Will save all results to a single file")

event_utils.setup_context_vars(configuration, metadata)
setup_geneva_event_handlers()
setup_job_log_event_handlers()
Expand Down Expand Up @@ -147,6 +155,7 @@ def init():
if configuration.async_mode:
callback_factory = CallbackFactory(
configuration=configuration,
output_handler=output_handler,
input_to_output_transformer=input_to_output_transformer)
finished_callback = callback_factory.generate_callback()

Expand Down Expand Up @@ -201,13 +210,6 @@ def run(input_data: pd.DataFrame, mini_batch_context):
try:
ret = par.run(data_list, mini_batch_context)

if (configuration.split_output):
output_handler = SeparateFileOutputHandler()
lu.get_logger().info("Saving successful results and errors to separate files")
else:
output_handler = SingleFileOutputHandler()
lu.get_logger().info("Saving results to single file")

if configuration.save_mini_batch_results == "enabled":
lu.get_logger().info("save_mini_batch_results is enabled")
output_handler.save_mini_batch_results(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"component_version": "1.1.5",
"component_version": "1.1.6",
"component_directory": "driver/batch_score_llm",
"component_name": "batch_score_llm",
"virtual_environment_name": null,
"registry_name": "azureml"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def handle_response(
if response_status == 200:
result = self._create_scoring_result(
status=ScoringResultStatus.SUCCESS,
model_response_code=response_status,
scoring_request=scoring_request,
start=start,
end=end,
Expand All @@ -124,6 +125,7 @@ def handle_response(
else: # Score failed
result = self._create_scoring_result(
status=ScoringResultStatus.FAILURE,
model_response_code=response_status,
scoring_request=scoring_request,
start=start,
end=end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@

"""Common utilities."""

import json
from argparse import ArgumentParser
from urllib.parse import urlparse

from ..common.scoring.scoring_result import ScoringResult
from . import embeddings_utils as embeddings
from .json_encoder_extensions import BatchComponentJSONEncoder


def get_base_url(url: str) -> str:
"""Get base url."""
Expand Down Expand Up @@ -38,39 +33,7 @@ def str2bool(v):
raise ArgumentParser.ArgumentTypeError('Boolean value expected.')


def convert_result_list(results: "list[ScoringResult]", batch_size_per_request: int) -> "list[str]":
"""Convert scoring results to the result list."""
output_list: list[dict[str, str]] = []
for scoringResult in results:
output: dict[str, str] = {}
output["status"] = scoringResult.status.name
output["start"] = scoringResult.start
output["end"] = scoringResult.end
output["request"] = scoringResult.request_obj
output["response"] = scoringResult.response_body

if scoringResult.segmented_response_bodies is not None and len(scoringResult.segmented_response_bodies) > 0:
output["segmented_responses"] = scoringResult.segmented_response_bodies

if scoringResult.request_metadata is not None:
output["request_metadata"] = scoringResult.request_metadata

if batch_size_per_request > 1:
batch_output_list = embeddings._convert_to_list_of_output_items(
output,
scoringResult.estimated_token_counts)
output_list.extend(batch_output_list)
else:
output_list.append(output)

return list(map(__stringify_output, output_list))


def get_mini_batch_id(mini_batch_context: any):
"""Get mini batch id from mini batch context."""
if mini_batch_context:
return mini_batch_context.mini_batch_id


def __stringify_output(payload_obj: dict) -> str:
return json.dumps(payload_obj, cls=BatchComponentJSONEncoder)
Loading
Loading