Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model Prediction Bug Fix #3614

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: compute_metrics
display_name: Compute Metrics
description: Calculate model performance metrics, given ground truth and prediction data.

version: 0.0.33
version: 0.0.35
type: command
tags:
type: evaluation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: model_prediction
display_name: Model Prediction
description: Generate predictions on a given mlflow model for supported tasks.

version: 0.0.34
version: 0.0.35
type: command
tags:
type: evaluation
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
$schema: https://azuremlschemas.azureedge.net/latest/pipelineComponent.schema.json
name: model_evaluation_pipeline
version: 0.0.33
version: 0.0.35
type: pipeline
display_name: Model Evaluation Pipeline
description: Pipeline component for model evaluation for supported tasks. \
Expand Down Expand Up @@ -87,7 +87,7 @@ outputs:
jobs:
validation_trigger_model_evaluation:
type: command
component: azureml:validation_trigger_model_evaluation:0.0.33
component: azureml:validation_trigger_model_evaluation:0.0.35
compute: '${{parent.inputs.compute_name}}'
resources:
instance_type: '${{parent.inputs.instance_type}}'
Expand All @@ -111,7 +111,7 @@ jobs:

model_prediction:
type: command
component: azureml:model_prediction:0.0.33
component: azureml:model_prediction:0.0.35
compute: '${{parent.inputs.compute_name}}'
resources:
instance_type: '${{parent.inputs.instance_type}}'
Expand All @@ -128,7 +128,7 @@ jobs:

compute_metrics:
type: command
component: azureml:compute_metrics:0.0.33
component: azureml:compute_metrics:0.0.35
compute: '${{parent.inputs.compute_name}}'
resources:
instance_type: '${{parent.inputs.instance_type}}'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: evaluation_trigger
display_name: Evaluation for Trigger
description: Evaluation job from trigger service

version: 0.0.3
type: command

inputs:
connection_string:
type: string
optional: false
description: "Connection String"
resource_id:
type: string
optional: false
description: "Resource ID"
query:
type: string
description: "DB Query"
sampling_rate:
type: string
description: "Sampling Rate"
cron_expression:
type: string
description: "Cron Expression"
evaluators:
type: string
description: "Json Serialized Evaluators info."

outputs:
preprocessed_data:
type: uri_file
evaluated_data:
type: uri_file

is_deterministic: True
environment: azureml://registries/azureml/environments/evaluations-built-in/versions/4

command: >-
python /app/online_eval/evaluate_online.py compute_metrics.py
--connection_string '${{inputs.connection_string}}'
--resource_id '${{inputs.resource_id}}'
--query '${{inputs.query}}'
--sampling_rate '${{inputs.sampling_rate}}'
--preprocessor_connection_type managed-identity
--cron_expression '${{inputs.cron_expression}}'
--preprocessed_data '${{outputs.preprocessed_data}}'
--evaluators '${{inputs.evaluators}}'
--evaluated_data '${{outputs.evaluated_data}}'
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: validation_trigger_model_evaluation
display_name: Validation Trigger Model Evaluation
description: Component for enabling validation of model evaluation pipeline.

version: 0.0.33
version: 0.0.35
type: command
tags:
type: evaluation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def predict(self, X_test, **kwargs):
y_pred = predict_fn(X_test)
except RuntimeError as e:
logger.warning(f"RuntimeError exception raised. Reason: {e}")
y_pred, kwargs = self.handle_device_failure(X_test, **kwargs)
y_pred = self.handle_device_failure(X_test, **kwargs)
if y_transformer is not None:
y_pred = y_transformer.transform(y_pred).toarray()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def prepare_chat_data_from_ft_pipeline(data: pd.DataFrame):
X_test, y_test = {local_constants.LLM_FT_CHAT_COMPLETION_KEY:[]}, []
for message in messages_col.to_list():
X_test[local_constants.LLM_FT_CHAT_COMPLETION_KEY].append(message[:-1])
y_test.append(message[-1]["content"])
y_test.append([message[-1]["content"]])
X_test = pd.DataFrame(X_test)
y_test = pd.Series(y_test)
return X_test, y_test.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def postprocess(self, result):
"""
y_pred_df, y_test_df, perf_df, y_pred_proba_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
for y_pred, y_test, perf, pred_probas in result:
logger.info(f"Type here as well: {type(y_test)}")
y_pred_df = pd.concat([y_pred_df, y_pred], axis=0)
y_test_df = pd.concat([y_test_df, y_test], axis=0)
perf_df = pd.concat([perf_df, perf], axis=0)
Expand Down Expand Up @@ -121,8 +122,8 @@ def _make_chat_completion_data(self, input_df, last_chats, col_name):
input_rows = input_df.values.tolist()
for ind, datarow in enumerate(input_rows):
conversation = datarow[0]
conversation.append({"role":"assistant", "content":last_chats[ind]})
appended_data[col_name].append(conversation)
updated_conversation = conversation + [{"role":"assistant", "content":last_chats[ind]}]
appended_data[col_name].append(updated_conversation)
return pd.DataFrame(appended_data)


Expand Down Expand Up @@ -183,6 +184,7 @@ def predict_single(self, data):
end_ms = time.time() * 1000
outputs = [res.response for i, res in enumerate(inference_results)]
pred_probas = [res.scores for res in inference_results]

perf_data = [{
PerformanceColumns.BATCH_SIZE_COLUMN_NAME: len(input_texts),
PerformanceColumns.START_TIME_COLUMN_NAME: datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat(),
Expand All @@ -195,12 +197,15 @@ def predict_single(self, data):
} for gt, pred in zip(input_texts, outputs)]
pred_proba_df = pd.DataFrame(pred_probas, index=X_test.index)
perf_data = pd.DataFrame(perf_data)

if self.task_type == SupportedTask.CHAT_COMPLETION or self.task_type == TaskType.CONVERSATIONAL:
pred_df = self._make_chat_completion_data(X_test, outputs,
col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION)
pred_df = self._make_chat_completion_data(X_test.copy(deep=True), outputs,
col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION, debug=False)
pred_df[ChatCompletionConstants.OUTPUT] = outputs
y_test = self._make_chat_completion_data(X_test, y_test, col_name="ground_truth")
y_test = pd.DataFrame(y_test, columns=["ground_truth"], index=X_test.index)
# y_test = self._make_chat_completion_data(X_test.copy(deep=True), y_test, col_name="ground_truth")
return pred_df, y_test, perf_data, pred_proba_df

pred_df = pd.DataFrame(outputs, index=X_test.index, columns=["prediction"])
if isinstance(y_test, pd.Series):
y_test = y_test.to_frame()
Expand Down Expand Up @@ -460,15 +465,11 @@ def main():
data_path = args.data

logger.info(f"Torch Current Device Count:{torch.cuda.device_count()}")

logger.info(f"Got Params: {args.parameters}")
logger.info(f"Params type: {type(args.parameters)}")
#logger.info(f"Evaled params: {eval(args.parameters)}")
extra_params.update(json.loads(args.parameters))

logger.info(f"Got Model Path: {args.mlflow_model}")

task_type = args.task

input_column_names, label_column_name, extra_y_test_cols = validate_and_get_columns(vars(args))

try:
Expand Down Expand Up @@ -552,6 +553,9 @@ def main():
raise exception
full_data = [(x, y) for x, y in data]
logger.info(f"Dataset size: {len(full_data)}")
# TODO Remove this line in Prod:
# logger.info("First few input rows")
# logger.info(f"{full_data[0][0].head()}")
predictor = Predictor(g_fmscorer, task_type, extra_params, num_replicas, label_column_name, tokenizer, extra_y_test_cols)
collated_res = [{} for i in range(distributed_state.num_processes)]
with distributed_state.split_between_processes(full_data) as proc_data:
Expand All @@ -563,7 +567,7 @@ def main():
logger.info("Waiting for all processes.....")
distributed_state.wait_for_everyone()
logger.info(f"Collated Results Lengths: {[len(i) for i in collated_res]}")
logger.info(f"Type of each key: {[(k, type(v), len(v)) for k, v in collated_res[0].items()]}")
# logger.info(f"Type of each key: {[(k, type(v), len(v)) for k, v in collated_res[0].items()]}")
y_pred_df, y_test_df, y_perf_df, y_pred_proba_df = _gather_predictions(collated_res)

if task_type != SupportedTask.CHAT_COMPLETION and task_type != TaskType.CONVERSATIONAL:
Expand Down
Loading