From e8eeff8122929b731b70ec56372a778d966144dc Mon Sep 17 00:00:00 2001 From: Ankush Bhatia Date: Tue, 19 Nov 2024 15:35:42 +0530 Subject: [PATCH 1/2] Model Prediction Bug Fix --- .../components/compute_metrics/spec.yaml | 2 +- .../components/model_prediction/spec.yaml | 2 +- .../components/pipeline_component/spec.yaml | 8 +-- .../components/schedule_component/spec.yaml | 50 +++++++++++++++++++ .../spec.yaml | 2 +- .../task_factory/tabular/classification.py | 2 +- 6 files changed, 58 insertions(+), 8 deletions(-) create mode 100644 assets/training/model_evaluation/components/schedule_component/spec.yaml diff --git a/assets/training/model_evaluation/components/compute_metrics/spec.yaml b/assets/training/model_evaluation/components/compute_metrics/spec.yaml index 6a234fbf5e..120aea6af1 100644 --- a/assets/training/model_evaluation/components/compute_metrics/spec.yaml +++ b/assets/training/model_evaluation/components/compute_metrics/spec.yaml @@ -3,7 +3,7 @@ name: compute_metrics display_name: Compute Metrics description: Calculate model performance metrics, given ground truth and prediction data. -version: 0.0.33 +version: 0.0.35 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/components/model_prediction/spec.yaml b/assets/training/model_evaluation/components/model_prediction/spec.yaml index 472aa2dc78..b23b302e63 100644 --- a/assets/training/model_evaluation/components/model_prediction/spec.yaml +++ b/assets/training/model_evaluation/components/model_prediction/spec.yaml @@ -3,7 +3,7 @@ name: model_prediction display_name: Model Prediction description: Generate predictions on a given mlflow model for supported tasks. -version: 0.0.34 +version: 0.0.35 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/components/pipeline_component/spec.yaml b/assets/training/model_evaluation/components/pipeline_component/spec.yaml index aa85cb9751..7849d8c1f9 100644 --- a/assets/training/model_evaluation/components/pipeline_component/spec.yaml +++ b/assets/training/model_evaluation/components/pipeline_component/spec.yaml @@ -1,6 +1,6 @@ $schema: https://azuremlschemas.azureedge.net/latest/pipelineComponent.schema.json name: model_evaluation_pipeline -version: 0.0.33 +version: 0.0.35 type: pipeline display_name: Model Evaluation Pipeline description: Pipeline component for model evaluation for supported tasks. \ @@ -87,7 +87,7 @@ outputs: jobs: validation_trigger_model_evaluation: type: command - component: azureml:validation_trigger_model_evaluation:0.0.33 + component: azureml:validation_trigger_model_evaluation:0.0.35 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' @@ -111,7 +111,7 @@ jobs: model_prediction: type: command - component: azureml:model_prediction:0.0.33 + component: azureml:model_prediction:0.0.35 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' @@ -128,7 +128,7 @@ jobs: compute_metrics: type: command - component: azureml:compute_metrics:0.0.33 + component: azureml:compute_metrics:0.0.35 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' diff --git a/assets/training/model_evaluation/components/schedule_component/spec.yaml b/assets/training/model_evaluation/components/schedule_component/spec.yaml new file mode 100644 index 0000000000..eeafe7129f --- /dev/null +++ b/assets/training/model_evaluation/components/schedule_component/spec.yaml @@ -0,0 +1,50 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json +name: evaluation_trigger +display_name: Evaluation for Trigger +description: Evaluation job from trigger service + +version: 0.0.3 +type: command + +inputs: + connection_string: + type: string + optional: false + description: "Connection String" + resource_id: + type: string + optional: false + description: "Resource ID" + query: + type: string + description: "DB Query" + sampling_rate: + type: string + description: "Sampling Rate" + cron_expression: + type: string + description: "Cron Expression" + evaluators: + type: string + description: "Json Serialized Evaluators info." + +outputs: + preprocessed_data: + type: uri_file + evaluated_data: + type: uri_file + +is_deterministic: True +environment: azureml://registries/azureml/environments/evaluations-built-in/versions/4 + +command: >- + python /app/online_eval/evaluate_online.py compute_metrics.py + --connection_string '${{inputs.connection_string}}' + --resource_id '${{inputs.resource_id}}' + --query '${{inputs.query}}' + --sampling_rate '${{inputs.sampling_rate}}' + --preprocessor_connection_type managed-identity + --cron_expression '${{inputs.cron_expression}}' + --preprocessed_data '${{outputs.preprocessed_data}}' + --evaluators '${{inputs.evaluators}}' + --evaluated_data '${{outputs.evaluated_data}}' diff --git a/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml b/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml index 66e35207fb..873760529e 100644 --- a/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml +++ b/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml @@ -3,7 +3,7 @@ name: validation_trigger_model_evaluation display_name: Validation Trigger Model Evaluation description: Component for enabling validation of model evaluation pipeline. -version: 0.0.33 +version: 0.0.35 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/src/task_factory/tabular/classification.py b/assets/training/model_evaluation/src/task_factory/tabular/classification.py index 770c145728..9528aeb340 100644 --- a/assets/training/model_evaluation/src/task_factory/tabular/classification.py +++ b/assets/training/model_evaluation/src/task_factory/tabular/classification.py @@ -124,7 +124,7 @@ def predict(self, X_test, **kwargs): y_pred = predict_fn(X_test) except RuntimeError as e: logger.warning(f"RuntimeError exception raised. Reason: {e}") - y_pred, kwargs = self.handle_device_failure(X_test, **kwargs) + y_pred = self.handle_device_failure(X_test, **kwargs) if y_transformer is not None: y_pred = y_transformer.transform(y_pred).toarray() From 6d8030a4d151e67de8de873f1525cd8f657ed997 Mon Sep 17 00:00:00 2001 From: Ankush Bhatia Date: Thu, 28 Nov 2024 14:39:12 +0530 Subject: [PATCH 2/2] Fix Distributed Model Prediction code --- .../src_distributed/data_utils.py | 2 +- .../src_distributed/model_prediction.py | 26 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/assets/training/model_evaluation/src_distributed/data_utils.py b/assets/training/model_evaluation/src_distributed/data_utils.py index 9e74efd3c8..fd3e2acd5c 100644 --- a/assets/training/model_evaluation/src_distributed/data_utils.py +++ b/assets/training/model_evaluation/src_distributed/data_utils.py @@ -196,7 +196,7 @@ def prepare_chat_data_from_ft_pipeline(data: pd.DataFrame): X_test, y_test = {local_constants.LLM_FT_CHAT_COMPLETION_KEY:[]}, [] for message in messages_col.to_list(): X_test[local_constants.LLM_FT_CHAT_COMPLETION_KEY].append(message[:-1]) - y_test.append(message[-1]["content"]) + y_test.append([message[-1]["content"]]) X_test = pd.DataFrame(X_test) y_test = pd.Series(y_test) return X_test, y_test.values diff --git a/assets/training/model_evaluation/src_distributed/model_prediction.py b/assets/training/model_evaluation/src_distributed/model_prediction.py index 7d819b9f6a..ef9dd3bcc3 100644 --- a/assets/training/model_evaluation/src_distributed/model_prediction.py +++ b/assets/training/model_evaluation/src_distributed/model_prediction.py @@ -89,6 +89,7 @@ def postprocess(self, result): """ y_pred_df, y_test_df, perf_df, y_pred_proba_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame() for y_pred, y_test, perf, pred_probas in result: + logger.info(f"Type here as well: {type(y_test)}") y_pred_df = pd.concat([y_pred_df, y_pred], axis=0) y_test_df = pd.concat([y_test_df, y_test], axis=0) perf_df = pd.concat([perf_df, perf], axis=0) @@ -121,8 +122,8 @@ def _make_chat_completion_data(self, input_df, last_chats, col_name): input_rows = input_df.values.tolist() for ind, datarow in enumerate(input_rows): conversation = datarow[0] - conversation.append({"role":"assistant", "content":last_chats[ind]}) - appended_data[col_name].append(conversation) + updated_conversation = conversation + [{"role":"assistant", "content":last_chats[ind]}] + appended_data[col_name].append(updated_conversation) return pd.DataFrame(appended_data) @@ -183,6 +184,7 @@ def predict_single(self, data): end_ms = time.time() * 1000 outputs = [res.response for i, res in enumerate(inference_results)] pred_probas = [res.scores for res in inference_results] + perf_data = [{ PerformanceColumns.BATCH_SIZE_COLUMN_NAME: len(input_texts), PerformanceColumns.START_TIME_COLUMN_NAME: datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat(), @@ -195,12 +197,15 @@ def predict_single(self, data): } for gt, pred in zip(input_texts, outputs)] pred_proba_df = pd.DataFrame(pred_probas, index=X_test.index) perf_data = pd.DataFrame(perf_data) + if self.task_type == SupportedTask.CHAT_COMPLETION or self.task_type == TaskType.CONVERSATIONAL: - pred_df = self._make_chat_completion_data(X_test, outputs, - col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION) + pred_df = self._make_chat_completion_data(X_test.copy(deep=True), outputs, + col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION, debug=False) pred_df[ChatCompletionConstants.OUTPUT] = outputs - y_test = self._make_chat_completion_data(X_test, y_test, col_name="ground_truth") + y_test = pd.DataFrame(y_test, columns=["ground_truth"], index=X_test.index) + # y_test = self._make_chat_completion_data(X_test.copy(deep=True), y_test, col_name="ground_truth") return pred_df, y_test, perf_data, pred_proba_df + pred_df = pd.DataFrame(outputs, index=X_test.index, columns=["prediction"]) if isinstance(y_test, pd.Series): y_test = y_test.to_frame() @@ -460,15 +465,11 @@ def main(): data_path = args.data logger.info(f"Torch Current Device Count:{torch.cuda.device_count()}") - logger.info(f"Got Params: {args.parameters}") - logger.info(f"Params type: {type(args.parameters)}") - #logger.info(f"Evaled params: {eval(args.parameters)}") extra_params.update(json.loads(args.parameters)) + logger.info(f"Got Model Path: {args.mlflow_model}") - task_type = args.task - input_column_names, label_column_name, extra_y_test_cols = validate_and_get_columns(vars(args)) try: @@ -552,6 +553,9 @@ def main(): raise exception full_data = [(x, y) for x, y in data] logger.info(f"Dataset size: {len(full_data)}") + # TODO Remove this line in Prod: + # logger.info("First few input rows") + # logger.info(f"{full_data[0][0].head()}") predictor = Predictor(g_fmscorer, task_type, extra_params, num_replicas, label_column_name, tokenizer, extra_y_test_cols) collated_res = [{} for i in range(distributed_state.num_processes)] with distributed_state.split_between_processes(full_data) as proc_data: @@ -563,7 +567,7 @@ def main(): logger.info("Waiting for all processes.....") distributed_state.wait_for_everyone() logger.info(f"Collated Results Lengths: {[len(i) for i in collated_res]}") - logger.info(f"Type of each key: {[(k, type(v), len(v)) for k, v in collated_res[0].items()]}") + # logger.info(f"Type of each key: {[(k, type(v), len(v)) for k, v in collated_res[0].items()]}") y_pred_df, y_test_df, y_perf_df, y_pred_proba_df = _gather_predictions(collated_res) if task_type != SupportedTask.CHAT_COMPLETION and task_type != TaskType.CONVERSATIONAL: