Skip to content

Commit

Permalink
test-impl
Browse files Browse the repository at this point in the history
Signed-off-by: Apurva Koti <[email protected]>
  • Loading branch information
apurva-koti committed May 6, 2024
1 parent 7494e32 commit 9e6e32f
Showing 1 changed file with 150 additions and 56 deletions.
206 changes: 150 additions & 56 deletions mlflow/metrics/genai/genai_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from inspect import Parameter, Signature
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, Tuple

from mlflow.exceptions import MlflowException
from mlflow.metrics.base import MetricValue
from mlflow.metrics.genai import model_utils
from mlflow.metrics.genai.base import EvaluationExample
from mlflow.metrics.genai.prompt_template import PromptTemplate
from mlflow.metrics.genai.utils import _get_default_model, _get_latest_metric_version
from mlflow.models import EvaluationMetric, make_metric
from mlflow.protos.databricks_pb2 import (
Expand All @@ -26,6 +27,14 @@

_logger = logging.getLogger(__name__)

_PROMPT_FORMATTING_WRAPPER = """
You must return the following fields in your response in two lines, one below the other:
score: Your numerical score for the model's {name} based on the rubric
justification: Your reasoning about the model's {name} score
Do not add additional new lines. Do not add any other fields.
"""


def _format_args_string(grading_context_columns: Optional[List[str]], eval_values, indx) -> str:
import pandas as pd
Expand Down Expand Up @@ -84,11 +93,132 @@ def _extract_score_and_justification(text):
return None, None


def _score_model_on_one_payload(
payload,
eval_model,
parameters,
):
try:
raw_result = model_utils.score_model_on_payload(
eval_model, payload, parameters
)
return _extract_score_and_justification(raw_result)
except ImportError:
raise
except MlflowException as e:
if e.error_code in [
ErrorCode.Name(BAD_REQUEST),
ErrorCode.Name(UNAUTHENTICATED),
ErrorCode.Name(INVALID_PARAMETER_VALUE),
]:
raise
else:
return None, f"Failed to score model on payload. Error: {e!s}"
except Exception as e:
return None, f"Failed to score model on payload. Error: {e!s}"


def _score_model_on_payloads(grading_payloads, model, parameters, max_workers) -> Tuple[List[int], List[str]]:
scores = [None] * len(grading_payloads)
justifications = [None] * len(grading_payloads)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
_score_model_on_one_payload,
payload,
model,
parameters,
): indx
for indx, payload in enumerate(grading_payloads)
}

as_comp = as_completed(futures)
try:
from tqdm.auto import tqdm

as_comp = tqdm(as_comp, total=len(futures))
except ImportError:
pass

for future in as_comp:
indx = futures[future]
score, justification = future.result()
scores[indx] = score
justifications[indx] = justification

return scores, justifications


def _get_aggregate_results(scores, aggregations):
# loop over the aggregations and compute the aggregate results on the scores
def aggregate_function(aggregate_option, scores):
import numpy as np

options = {
"min": np.min,
"max": np.max,
"mean": np.mean,
"median": np.median,
"variance": np.var,
"p90": lambda x: np.percentile(x, 90) if x else None,
}

if aggregate_option not in options:
raise MlflowException(
message=f"Invalid aggregate option {aggregate_option}.",
error_code=INVALID_PARAMETER_VALUE,
)

return options[aggregate_option](scores)

scores_for_aggregation = [score for score in scores if score is not None]

return (
{option: aggregate_function(option, scores_for_aggregation) for option in aggregations}
if aggregations is not None
else {}
)


def _make_custom_genai_metric(
name: str,
judge_prompt: Optional[str] = None,
model: Optional[str] = _get_default_model(),
parameters: Optional[Dict[str, Any]] = None,
aggregations: Optional[List[str]] = ["mean", "variance", "p90"], # noqa: B006
greater_is_better: bool = True,
max_workers: int = 10,
) -> EvaluationMetric:

def eval_fn(
*args,
**kwargs,
) -> MetricValue:
"""
This is the function that is called when the metric is evaluated.
"""
prompt_template = PromptTemplate([judge_prompt, _PROMPT_FORMATTING_WRAPPER])
grading_payloads = pd.DataFrame(kwargs).to_dict(orient="records")
arg_strings = [prompt_template.format(**payload) for payload in grading_payloads]
scores, justifications = _score_model_on_payloads(arg_strings, model, parameters, max_workers)

aggregate_scores = _get_aggregate_results(scores, aggregations)

return MetricValue(scores, justifications, aggregate_scores)

return make_metric(
eval_fn=eval_fn,
greater_is_better=greater_is_better,
name=name,
)


@experimental
def make_genai_metric(
name: str,
definition: str,
grading_prompt: str,
definition: Optional[str] = None,
grading_prompt: Optional[str] = None,
judge_prompt: Optional[str] = None,
examples: Optional[List[EvaluationExample]] = None,
version: Optional[str] = _get_latest_metric_version(),
model: Optional[str] = _get_default_model(),
Expand All @@ -107,6 +237,9 @@ def make_genai_metric(
name: Name of the metric.
definition: Definition of the metric.
grading_prompt: Grading criteria of the metric.
judge_prompt: (Optional) The entire prompt to be used for the judge model. This is useful for including
use cases or system prompts that are not covered by the full grading prompt in any ``EvaluationMetric``
object. If used,
examples: (Optional) Examples of the metric.
version: (Optional) Version of the metric. Currently supported versions are: v1.
model: (Optional) Model uri of an openai, gateway, or deployments judge model in the
Expand Down Expand Up @@ -196,6 +329,15 @@ def make_genai_metric(
greater_is_better=True,
)
"""
if judge_prompt is not None:
return _make_custom_genai_metric()

if definition is None or grading_prompt is None:
raise MlflowException(
"Both definition and grading_prompt must be provided.",
error_code=INVALID_PARAMETER_VALUE,
)

if not isinstance(grading_context_columns, list):
grading_context_columns = [grading_context_columns]

Expand Down Expand Up @@ -306,38 +448,16 @@ def eval_fn(
)
)

def score_model_on_one_payload(
payload,
eval_model,
):
try:
raw_result = model_utils.score_model_on_payload(
eval_model, payload, eval_parameters
)
return _extract_score_and_justification(raw_result)
except ImportError:
raise
except MlflowException as e:
if e.error_code in [
ErrorCode.Name(BAD_REQUEST),
ErrorCode.Name(UNAUTHENTICATED),
ErrorCode.Name(INVALID_PARAMETER_VALUE),
]:
raise
else:
return None, f"Failed to score model on payload. Error: {e!s}"
except Exception as e:
return None, f"Failed to score model on payload. Error: {e!s}"

scores = [None] * len(inputs)
justifications = [None] * len(inputs)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
score_model_on_one_payload,
_score_model_on_one_payload,
payload,
eval_model,
eval_parameters,
): indx
for indx, payload in enumerate(grading_payloads)
}
Expand All @@ -356,35 +476,7 @@ def score_model_on_one_payload(
scores[indx] = score
justifications[indx] = justification

# loop over the aggregations and compute the aggregate results on the scores
def aggregate_function(aggregate_option, scores):
import numpy as np

options = {
"min": np.min,
"max": np.max,
"mean": np.mean,
"median": np.median,
"variance": np.var,
"p90": lambda x: np.percentile(x, 90) if x else None,
}

if aggregate_option not in options:
raise MlflowException(
message=f"Invalid aggregate option {aggregate_option}.",
error_code=INVALID_PARAMETER_VALUE,
)

return options[aggregate_option](scores)

scores_for_aggregation = [score for score in scores if score is not None]

aggregate_results = (
{option: aggregate_function(option, scores_for_aggregation) for option in aggregations}
if aggregations is not None
else {}
)

aggregate_results = _get_aggregate_results(scores, aggregations)
return MetricValue(scores, justifications, aggregate_results)

signature_parameters = [
Expand All @@ -406,3 +498,5 @@ def aggregate_function(aggregate_option, scores):
version=version,
metric_details=evaluation_context["eval_prompt"].__str__(),
)


0 comments on commit 9e6e32f

Please sign in to comment.