Skip to content

Commit

Permalink
Batch score component 1.1.4 release (#2603)
Browse files Browse the repository at this point in the history
* Batch score component 1.1.4 release

* Fix linter errors

* Update

* Update

* Fix e2e test
  • Loading branch information
SagarikaKengunte authored Mar 28, 2024
1 parent 12dd5fd commit d11affa
Show file tree
Hide file tree
Showing 24 changed files with 509 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ $schema: http://azureml/sdk-2-0/ParallelComponent.json
type: parallel

name: batch_score_llm
version: 1.1.3
version: 1.1.4
display_name: Batch Score Large Language Models
is_deterministic: False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
ScoringResultStatus,
)
from ...common.scoring.tally_failed_request_handler import TallyFailedRequestHandler
from ...common.scoring.scoring_utils import get_prompt_tokens, get_completion_tokens
from ...common.telemetry.events import event_utils
from ...common.telemetry.events.batch_score_request_completed_event import BatchScoreRequestCompletedEvent
from ...utils.common import get_mini_batch_id


class AoaiHttpResponseHandler(HttpResponseHandler):
Expand Down Expand Up @@ -144,22 +146,6 @@ def _emit_request_completed_event(
end: float,
worker_id: str) -> None:

def get_prompt_tokens(response_body: any):
if not isinstance(response_body, dict):
return None

return response_body.get("usage", {}).get("prompt_tokens")

def get_completion_tokens(response_body: any):
if not isinstance(response_body, dict):
return None

return response_body.get("usage", {}).get("completion_tokens")

def get_mini_batch_id(mini_batch_context: any):
if mini_batch_context:
return mini_batch_context.mini_batch_id

def get_model_name(response_body: any):
if not isinstance(response_body, dict):
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Configuration(Namespace):
ensure_ascii: bool = field(init=True, default=None)
image_input_folder: str = field(init=True, default=None)
initial_worker_count: int = field(init=True, default=None)
input_schema_version: int = field(init=True, default=1)
max_retry_time_interval: int = field(init=True, default=None)
max_worker_count: int = field(init=True, default=None)
mini_batch_results_out_directory: str = field(init=True, default=None)
Expand All @@ -44,6 +45,7 @@ class Configuration(Namespace):
segment_large_requests: str = field(init=True, default=None)
segment_max_token_size: int = field(init=True, default=None)
service_namespace: str = field(init=True, default=None)
split_output: bool = field(init=True, default=False)
stdout_log_level: str = field(init=True, default="debug")
tally_exclusions: str = field(init=True, default=None)
tally_failed_requests: bool = field(init=True, default=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def parse_configuration(self, args: "list[str]" = None) -> Configuration:
ensure_ascii=config.get('output_settings', {}).get('ensure_ascii'),
image_input_folder=None,
initial_worker_count=config.get('concurrency_settings', {}).get('initial_worker_count'),
input_schema_version=config.get('request_settings', {}).get('input_schema_version'),
max_retry_time_interval=config.get('request_settings', {}).get('timeout'),
max_worker_count=config.get('concurrency_settings', {}).get('max_worker_count'),
mini_batch_results_out_directory=parsed_args.partitioned_scoring_results,
Expand All @@ -71,6 +72,7 @@ def parse_configuration(self, args: "list[str]" = None) -> Configuration:
segment_large_requests=config.get('api', {}).get('response_segment_size') > 0,
segment_max_token_size=config.get('api', {}).get('response_segment_size'),
service_namespace=None,
split_output=config.get('output_settings', {}).get('split_output'),
stdout_log_level=config.get('log_settings', {}).get('stdout_log_level'),
tally_exclusions=None,
tally_failed_requests=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _apply_defaults(self, instance):
instance["request_settings"].setdefault("headers", {})
instance["request_settings"].setdefault("properties", {})
instance["request_settings"].setdefault("timeout", 600)
instance["request_settings"].setdefault("input_schema_version", 1)

# log_settings
instance.setdefault("log_settings", {})
Expand All @@ -98,6 +99,7 @@ def _apply_defaults(self, instance):
instance.setdefault("output_settings", {})
instance["output_settings"].setdefault("ensure_ascii", False)
instance["output_settings"].setdefault("save_partitioned_scoring_results", True)
instance["output_settings"].setdefault("split_output", False)

return instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
"type": "boolean",
"default": true,
"description": "If true, scoring results will be written to multiple files in the `partitioned_scoring_results` directory. If false, scoring results returned to the caller."
},
"split_output": {
"type": "boolean",
"default": false,
"description": "If true, the successful results and errors will be written to separate files. If false, the output will be written to a single file."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"user_agent_identifier": {
"type": "string",
"description": "An identifier to be included in the User-Agent header for all outgoing requests."
},
"input_schema_version": {
"type": "integer",
"default": 1,
"description": "The version of input schema used in the input data - By default, this will be 1."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from .result_utils import (
apply_input_transformer,
get_return_value,
save_mini_batch_results,
)
from .output_handler import SingleFileOutputHandler, SeparateFileOutputHandler


def add_callback(callback, cur):
"""Append a callback to a list."""

def wrapper(scoring_results: "list[ScoringResult]", mini_batch_context: MiniBatchContext):
scoring_results = callback(scoring_results, mini_batch_context)
return cur(scoring_results, mini_batch_context)
Expand Down Expand Up @@ -63,10 +64,17 @@ def _save_mini_batch_result_and_emit(
if mini_batch_context.exception is None:
if self._configuration.save_mini_batch_results == "enabled":
lu.get_logger().info("save_mini_batch_results is enabled")
save_mini_batch_results(
if (self._configuration.split_output):
output_handler = SeparateFileOutputHandler()
lu.get_logger().info("Saving successful results and errors to separate files")
else:
output_handler = SingleFileOutputHandler()
lu.get_logger().info("Saving results to single file")
output_handler.save_mini_batch_results(
scoring_results,
self._configuration.mini_batch_results_out_directory,
mini_batch_context.raw_mini_batch_context)
mini_batch_context.raw_mini_batch_context
)
else:
lu.get_logger().info("save_mini_batch_results is disabled")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""This file contains the definition for the Output Handler."""

import os

from abc import ABC, abstractmethod
from ..telemetry import logging_utils as lu


class OutputHandler(ABC):
"""An abstract class for handling output."""

@abstractmethod
def save_mini_batch_results(
self,
mini_batch_results: list,
mini_batch_results_out_directory: str,
raw_mini_batch_context):
"""Abstract save method."""
pass


class SingleFileOutputHandler(OutputHandler):
"""Defines a class to emit all results to a single output file. This is used as the default output handler."""

def save_mini_batch_results(
self,
mini_batch_results: list,
mini_batch_results_out_directory: str,
raw_mini_batch_context):
"""Save mini batch results to a single file."""
lu.get_logger().debug("mini_batch_results_out_directory: {}".format(mini_batch_results_out_directory))

filename = f"{raw_mini_batch_context.minibatch_index}.jsonl"
file_path = os.path.join(mini_batch_results_out_directory, filename)

lu.get_logger().debug(f"Start saving {len(mini_batch_results)} results to file {file_path}.")
with open(file_path, "w", encoding="utf-8") as writer:
for item in mini_batch_results:
writer.write(item + "\n")

lu.get_logger().info(f"Completed saving {len(mini_batch_results)} results to file {file_path}.")


class SeparateFileOutputHandler(OutputHandler):
"""Defines a class to emit successful results and errors to separate output files."""

def save_mini_batch_results(
self,
mini_batch_results: list,
mini_batch_results_out_directory: str,
raw_mini_batch_context):
"""Save successful mini batch results and errors to two separate files."""
lu.get_logger().debug("mini_batch_results_out_directory: {}".format(mini_batch_results_out_directory))

os.makedirs(mini_batch_results_out_directory+"/results", exist_ok=True)
success_filename = f"results/results_{raw_mini_batch_context.minibatch_index}.jsonl"
success_file_path = os.path.join(mini_batch_results_out_directory, success_filename)

os.makedirs(mini_batch_results_out_directory+"/errors", exist_ok=True)
error_filename = f"errors/errors_{raw_mini_batch_context.minibatch_index}.jsonl"
error_file_path = os.path.join(mini_batch_results_out_directory, error_filename)

lu.get_logger().debug(f"Start saving {len(mini_batch_results)} results to files {success_file_path} \
and {error_file_path}.")
with open(success_file_path, "w", encoding="utf-8") as success_writer, \
open(error_file_path, "w", encoding="utf-8") as error_writer:
for item in mini_batch_results:
if '"status": "SUCCESS"' in item:
success_writer.write(item + "\n")
else:
error_writer.write(item + "\n")

lu.get_logger().info(f"Completed saving {len(mini_batch_results)} results to files {success_file_path} \
and {error_file_path}.")
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

"""Result utilities."""

import os

from ..request_modification.input_transformer import InputTransformer
from ..scoring.scoring_result import ScoringResult
from ..telemetry import logging_utils as lu
Expand Down Expand Up @@ -41,18 +39,3 @@ def get_return_value(ret: 'list[str]', output_behavior: str):

lu.get_logger().info("Returning results in append_row mode.")
return ret


def save_mini_batch_results(mini_batch_results: list, mini_batch_results_out_directory: str, raw_mini_batch_context):
"""Save mini batch results."""
lu.get_logger().debug("mini_batch_results_out_directory: {}".format(mini_batch_results_out_directory))

filename = f"{raw_mini_batch_context.minibatch_index}.jsonl"
file_path = os.path.join(mini_batch_results_out_directory, filename)

lu.get_logger().debug(f"Start saving {len(mini_batch_results)} results to file {file_path}.")
with open(file_path, "w", encoding="utf-8") as writer:
for item in mini_batch_results:
writer.write(item + "\n")

lu.get_logger().info(f"Completed saving {len(mini_batch_results)} results to file {file_path}.")
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,19 @@ def get_retriable_type(
return RetriableType.RETRY_UNTIL_MAX_RETRIES

return RetriableType.NOT_RETRIABLE


def get_prompt_tokens(response_body: any):
"""Get prompt token count from http response."""
if not isinstance(response_body, dict):
return None

return response_body.get("usage", {}).get("prompt_tokens")


def get_completion_tokens(response_body: any):
"""Get completion token count from http response."""
if not isinstance(response_body, dict):
return None

return response_body.get("usage", {}).get("completion_tokens")
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .events.batch_score_request_completed_event import BatchScoreRequestCompletedEvent
from .events.batch_score_input_row_completed_event import BatchScoreInputRowCompletedEvent

from . import logging_utils as lu


class MinibatchAggregator:
"""Minibatch aggregator."""
Expand Down Expand Up @@ -80,6 +82,10 @@ def summarize(
row_completed_timestamps = sorted(
e.event_time.timestamp() for e in rows_completed_events) or [minibatch_start_time]

lu.get_logger().info(f"Minibatch {minibatch_id}: Successfully summarized "
f"http_request_completed_events: {len(http_request_completed_events)}, "
f"rows_completed_events: {len(rows_completed_events)}.")

return BatchScoreMinibatchCompletedEvent(
minibatch_id=minibatch_id,
scoring_url=self._start_event_per_minibatch[minibatch_id].scoring_url,
Expand Down Expand Up @@ -127,13 +133,15 @@ def summarize_endpoints(self, minibatch_id: str) -> list:
if not self._emit_endpoint_health_events:
return []

processed_event_count = 0
request_endpoint_map = self._http_request_completed_events_per_minibatch_per_endpoint
http_request_completed_events_per_endpoint = request_endpoint_map[minibatch_id]

endpoint_health_events = []
for endpoint_uri, http_request_completed_events in http_request_completed_events_per_endpoint.items():
http_request_durations_ms = [e.duration_ms for e in http_request_completed_events] or [0]

processed_event_count += len(http_request_completed_events)
event = BatchScoreMinibatchEndpointHealthEvent(
minibatch_id=minibatch_id,
scoring_url=endpoint_uri,
Expand All @@ -157,13 +165,16 @@ def summarize_endpoints(self, minibatch_id: str) -> list:
)
endpoint_health_events.append(event)

lu.get_logger().info(f"Minibatch {minibatch_id}: Successfully summarized "
f"http_request_completed_events: {processed_event_count}. ")

return endpoint_health_events

def _is_request_succeeded(self, event: BatchScoreEvent) -> bool:
return 200 <= abs(event.response_code) < 300
return event.response_code and 200 <= abs(event.response_code) < 300

def _is_request_user_error(self, event: BatchScoreEvent) -> bool:
return 400 <= abs(event.response_code) < 500
return event.response_code and 400 <= abs(event.response_code) < 500

def _is_request_system_error(self, event: BatchScoreEvent) -> bool:
return 500 <= abs(event.response_code) < 600
return event.response_code and 500 <= abs(event.response_code) < 600
Loading

0 comments on commit d11affa

Please sign in to comment.