From 0a9c24cce23e94d52b853d906e466f76cc8408ac Mon Sep 17 00:00:00 2001 From: Richard Li Date: Thu, 2 Nov 2023 21:21:02 -0700 Subject: [PATCH] support json columns like chat_history (#1628) * support json columns like chat_history * still drop chat_history due to mltable limit * update preprocessor version * update test * remove trailing whitespace * add doc string --------- Co-authored-by: Richard Li --- .../spec.yaml | 2 +- .../model_data_collector_preprocessor/run.py | 39 ++-- .../src/shared_utilities/constants.py | 1 + .../2023/10/24/22/mdc_chat_history.jsonl | 3 + .../2023/10/30/16/mdc_data_chat_history.jsonl | 4 + .../tests/unit/test_mdc_preprocessor.py | 219 +++++++++++++++++- 6 files changed, 241 insertions(+), 27 deletions(-) create mode 100644 assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/24/22/mdc_chat_history.jsonl create mode 100644 assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/30/16/mdc_data_chat_history.jsonl diff --git a/assets/model_monitoring/components/model_monitor/model_data_collector_preprocessor/spec.yaml b/assets/model_monitoring/components/model_monitor/model_data_collector_preprocessor/spec.yaml index 2b77215299..11f4459c34 100644 --- a/assets/model_monitoring/components/model_monitor/model_data_collector_preprocessor/spec.yaml +++ b/assets/model_monitoring/components/model_monitor/model_data_collector_preprocessor/spec.yaml @@ -4,7 +4,7 @@ type: spark name: model_data_collector_preprocessor display_name: Model Data Collector - Preprocessor description: Filters the data based on the window provided. -version: 0.3.5 +version: 0.3.6 is_deterministic: true code: ../../src diff --git a/assets/model_monitoring/components/src/model_data_collector_preprocessor/run.py b/assets/model_monitoring/components/src/model_data_collector_preprocessor/run.py index e4c0ed233f..2b648c4926 100644 --- a/assets/model_monitoring/components/src/model_data_collector_preprocessor/run.py +++ b/assets/model_monitoring/components/src/model_data_collector_preprocessor/run.py @@ -15,7 +15,7 @@ from fsspec import AbstractFileSystem from azureml.fsspec import AzureMachineLearningFileSystem from datetime import datetime -from pyspark.sql.functions import col, lit +from pyspark.sql.functions import lit, col from shared_utilities.event_utils import post_warning_event from shared_utilities.io_utils import ( init_spark, @@ -27,7 +27,8 @@ MDC_CHAT_HISTORY_COLUMN, MDC_CORRELATION_ID_COLUMN, MDC_DATA_COLUMN, - MDC_DATAREF_COLUMN + MDC_DATAREF_COLUMN, + SCHEMA_INFER_ROW_COUNT ) from typing import Tuple @@ -158,7 +159,7 @@ def _get_data_columns(df: DataFrame) -> list: return columns -def _extract_data_and_correlation_id(df: DataFrame, extract_correlation_id: bool, datastore: str) -> DataFrame: +def _extract_data_and_correlation_id(df: DataFrame, extract_correlation_id: bool, datastore: str = None) -> DataFrame: """ Extract data and correlation id from the MDC logs. @@ -166,7 +167,7 @@ def _extract_data_and_correlation_id(df: DataFrame, extract_correlation_id: bool otherwise, return the dataref content which is a url to the json file. """ - def read_data(row): + def read_data(row) -> str: data = getattr(row, MDC_DATA_COLUMN, None) if data: return data @@ -178,12 +179,17 @@ def read_data(row): return data_url # TODO: Move this to tracking stream if both data and dataref are NULL - # Output MLTable + def row_to_pdf(row) -> pd.DataFrame: + return pd.read_json(read_data(row)) + data_columns = _get_data_columns(df) - first_data_row = df.select(data_columns).rdd.map(lambda x: x).first() + data_rows = df.select(data_columns).rdd.take(SCHEMA_INFER_ROW_COUNT) # TODO: make it an argument user can define spark = init_spark() - data_as_df = spark.createDataFrame(pd.read_json(read_data(first_data_row))) + infer_pdf = pd.concat([row_to_pdf(row) for row in data_rows], ignore_index=True) + data_as_df = spark.createDataFrame(infer_pdf) + # data_as_df.show() + # data_as_df.printSchema() # The temporary workaround to remove the chat_history column if it exists. # We are removing the column because the pyspark DF is unable to parse it. @@ -191,6 +197,15 @@ def read_data(row): if MDC_CHAT_HISTORY_COLUMN in data_as_df.columns: data_as_df = data_as_df.drop(col(MDC_CHAT_HISTORY_COLUMN)) + def extract_data_and_correlation_id(entry, correlationid): + result = pd.read_json(entry) + result[MDC_CORRELATION_ID_COLUMN] = "" + for index, row in result.iterrows(): + result.loc[index, MDC_CORRELATION_ID_COLUMN] = ( + correlationid + "_" + str(index) + ) + return result + def tranform_df_function_with_correlation_id(iterator): for df in iterator: yield pd.concat( @@ -201,15 +216,6 @@ def tranform_df_function_with_correlation_id(iterator): for row in df.itertuples() ) - def extract_data_and_correlation_id(entry, correlationid): - result = pd.read_json(entry) - result[MDC_CORRELATION_ID_COLUMN] = "" - for index, row in result.iterrows(): - result.loc[index, MDC_CORRELATION_ID_COLUMN] = ( - correlationid + "_" + str(index) - ) - return result - def transform_df_function_without_correlation_id(iterator): for df in iterator: yield pd.concat( @@ -247,6 +253,7 @@ def _raw_mdc_uri_folder_to_preprocessed_spark_df( df = _convert_mltable_to_spark_df(table, preprocessed_input_data, fs) # print("df after converting mltable to spark df:") # df.show() + # df.printSchema() if not df: print("Skipping the Model Data Collector preprocessor.") diff --git a/assets/model_monitoring/components/src/shared_utilities/constants.py b/assets/model_monitoring/components/src/shared_utilities/constants.py index 450d331d72..ddddef911d 100644 --- a/assets/model_monitoring/components/src/shared_utilities/constants.py +++ b/assets/model_monitoring/components/src/shared_utilities/constants.py @@ -90,3 +90,4 @@ MDC_CORRELATION_ID_COLUMN = 'correlationid' MDC_DATA_COLUMN = 'data' MDC_DATAREF_COLUMN = 'dataref' +SCHEMA_INFER_ROW_COUNT = 10 diff --git a/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/24/22/mdc_chat_history.jsonl b/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/24/22/mdc_chat_history.jsonl new file mode 100644 index 0000000000..6b27fa25f0 --- /dev/null +++ b/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/24/22/mdc_chat_history.jsonl @@ -0,0 +1,3 @@ +{"specversion":"1.0","id":"82c8f4c4-8c40-4c14-8a98-55a957004924","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:20:24Z","data":[{"question":"price of trail shoes ?","chat_history":[]}],"contentrange":"bytes 0-56/57","correlationid":"6c4861f8-fa33-42ff-bc93-cf8f459ee013","xrequestid":"6c4861f8-fa33-42ff-bc93-cf8f459ee013","modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame","agent":"azureml-ai-monitoring/0.1.0b3"} +{"specversion":"1.0","id":"8b52369e-7fb9-454b-a8f5-6c845137fd5f","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:24:55Z","data":[{"question":"What is the proper care for trailwalker hiking shoes?","chat_history":[{"inputs":{"question":"How much does TrailWalker Hiking Shoes cost? "},"outputs":{"output":"The TrailWalker Hiking Shoes cost $110. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}}]}],"modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame","agent":"azureml-ai-monitoring/0.1.0b3","contentrange":"bytes 0-384/385","correlationid":"7415b7a9-47d5-44bd-8d50-8762ff542d2a","xrequestid":"7415b7a9-47d5-44bd-8d50-8762ff542d2a"} +{"specversion":"1.0","id":"c26e3d0f-2f2d-42af-949a-d267e0318824","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:26:11Z","data":[{"question":"What is the material for PowerBurner Camping Stove?","chat_history":[{"inputs":{"question":"How much does TrailWalker Hiking Shoes cost? "},"outputs":{"output":"The TrailWalker Hiking Shoes cost $110. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What is the proper care for trailwalker hiking shoes?"},"outputs":{"output":"According to the product information, proper care and maintenance for the TrailWalker Hiking Shoes include removing any dirt or debris after each use, rinsing them with clean water and gently scrubbing with a soft brush if they are muddy or heavily soiled, allowing them to air dry naturally, periodically applying a waterproofing treatment, inspecting them regularly for any signs of wear and tear, and storing them in a cool, dry place when not in use. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What brand is for TrailMaster tent? "},"outputs":{"output":"The brand for the TrailMaster X4 Tent is OutdoorLiving. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"How do I carry the TrailMaster tent around?"},"outputs":{"output":"The TrailMaster X4 Tent comes with a carry bag for easy transport. When packed in its carry bag, it can be comfortably carried during hikes. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the floor area for Floor Area?"},"outputs":{"output":"The floor area for the TrailMaster X4 Tent is 80 square feet. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the material for TrailBlaze Hiking Pants"},"outputs":{"output":"The TrailBlaze Hiking Pants are made of high-quality nylon fabric. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"What color does TrailBlaze Hiking Pants come in"},"outputs":{"output":"The TrailBlaze Hiking Pants come in khaki color. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"Cant he warrenty for TrailBlaze pants be transfered?"},"outputs":{"output":"No, the warranty for the TrailBlaze Hiking Pants is non-transferable and applies only to the original purchaser of the product. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"How long are the TrailBlaze pants under warrenty for?"},"outputs":{"output":"The TrailBlaze Hiking Pants are covered by a limited warranty for 1 year from the date of purchase. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}}]}],"agent":"azureml-ai-monitoring/0.1.0b3","contentrange":"bytes 0-3511/3512","correlationid":"f313a05b-5f84-449e-a6c6-046ef2f39409","xrequestid":"f313a05b-5f84-449e-a6c6-046ef2f39409","modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame"} diff --git a/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/30/16/mdc_data_chat_history.jsonl b/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/30/16/mdc_data_chat_history.jsonl new file mode 100644 index 0000000000..5cd57bee7d --- /dev/null +++ b/assets/model_monitoring/components/tests/unit/raw_mdc_data/2023/10/30/16/mdc_data_chat_history.jsonl @@ -0,0 +1,4 @@ +{"specversion":"1.0","id":"d77a23be-583d-402b-ba34-ffc11b8a9de2","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:24:45Z","data":[{"question":"How much does TrailWalker Hiking Shoes cost? ","chat_history":[]}],"agent":"azureml-ai-monitoring/0.1.0b3","contentrange":"bytes 0-79/80","correlationid":"061a700f-32f2-4a1a-8166-a1faee9c0669","xrequestid":"061a700f-32f2-4a1a-8166-a1faee9c0669","modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame"}, +{"specversion":"1.0","id":"16be97f0-8ef7-4323-94d3-7dbd4f5dee33","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:25:21Z","data":[{"question":"What is the floor area for Floor Area?","chat_history":[{"inputs":{"question":"How much does TrailWalker Hiking Shoes cost? "},"outputs":{"output":"The TrailWalker Hiking Shoes cost $110. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What is the proper care for trailwalker hiking shoes?"},"outputs":{"output":"According to the product information, proper care and maintenance for the TrailWalker Hiking Shoes include removing any dirt or debris after each use, rinsing them with clean water and gently scrubbing with a soft brush if they are muddy or heavily soiled, allowing them to air dry naturally, periodically applying a waterproofing treatment, inspecting them regularly for any signs of wear and tear, and storing them in a cool, dry place when not in use. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What brand is for TrailMaster tent? "},"outputs":{"output":"The brand for the TrailMaster X4 Tent is OutdoorLiving. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"How do I carry the TrailMaster tent around?"},"outputs":{"output":"The TrailMaster X4 Tent comes with a carry bag for easy transport. When packed in its carry bag, it can be comfortably carried during hikes. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}}]}],"xrequestid":"bd7cfa70-4283-44bb-9f21-3c99efbda7df","collectdatatype":"pandas.core.frame.DataFrame","agent":"azureml-ai-monitoring/0.1.0b3","modelversion":"default","contentrange":"bytes 0-1790/1791","correlationid":"bd7cfa70-4283-44bb-9f21-3c99efbda7df"} +{"specversion":"1.0","id":"12b1bf45-e865-4a95-97d5-257735b32951","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:25:45Z","data":[{"question":"Cant he warrenty for TrailBlaze pants be transfered?","chat_history":[{"inputs":{"question":"How much does TrailWalker Hiking Shoes cost? "},"outputs":{"output":"The TrailWalker Hiking Shoes cost $110. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What is the proper care for trailwalker hiking shoes?"},"outputs":{"output":"According to the product information, proper care and maintenance for the TrailWalker Hiking Shoes include removing any dirt or debris after each use, rinsing them with clean water and gently scrubbing with a soft brush if they are muddy or heavily soiled, allowing them to air dry naturally, periodically applying a waterproofing treatment, inspecting them regularly for any signs of wear and tear, and storing them in a cool, dry place when not in use. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What brand is for TrailMaster tent? "},"outputs":{"output":"The brand for the TrailMaster X4 Tent is OutdoorLiving. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"How do I carry the TrailMaster tent around?"},"outputs":{"output":"The TrailMaster X4 Tent comes with a carry bag for easy transport. When packed in its carry bag, it can be comfortably carried during hikes. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the floor area for Floor Area?"},"outputs":{"output":"The floor area for the TrailMaster X4 Tent is 80 square feet. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the material for TrailBlaze Hiking Pants"},"outputs":{"output":"The TrailBlaze Hiking Pants are made of high-quality nylon fabric. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"What color does TrailBlaze Hiking Pants come in"},"outputs":{"output":"The TrailBlaze Hiking Pants come in khaki color. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}}]}],"modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame","contentrange":"bytes 0-2753/2754","correlationid":"2df66a86-2c36-42d1-8186-d8071357c1c3","xrequestid":"2df66a86-2c36-42d1-8186-d8071357c1c3","agent":"azureml-ai-monitoring/0.1.0b3"} +{"specversion":"1.0","id":"918f455d-171d-4589-9320-994176c55bcd","source":"/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourceGroups/copilot-demo-eastus/providers/Microsoft.MachineLearningServices/workspaces/aistudio_townhall/onlineEndpoints/aistudio-townhall-kuxtz/deployments/aistudio-townhall-kuxtz-1","type":"azureml.inference.model_inputs","datacontenttype":"application/json","time":"2023-10-24T22:26:00Z","data":[{"question":"How long are the TrailBlaze pants under warrenty for?","chat_history":[{"inputs":{"question":"How much does TrailWalker Hiking Shoes cost? "},"outputs":{"output":"The TrailWalker Hiking Shoes cost $110. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What is the proper care for trailwalker hiking shoes?"},"outputs":{"output":"According to the product information, proper care and maintenance for the TrailWalker Hiking Shoes include removing any dirt or debris after each use, rinsing them with clean water and gently scrubbing with a soft brush if they are muddy or heavily soiled, allowing them to air dry naturally, periodically applying a waterproofing treatment, inspecting them regularly for any signs of wear and tear, and storing them in a cool, dry place when not in use. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_11.md)"}},{"inputs":{"question":"What brand is for TrailMaster tent? "},"outputs":{"output":"The brand for the TrailMaster X4 Tent is OutdoorLiving. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"How do I carry the TrailMaster tent around?"},"outputs":{"output":"The TrailMaster X4 Tent comes with a carry bag for easy transport. When packed in its carry bag, it can be comfortably carried during hikes. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the floor area for Floor Area?"},"outputs":{"output":"The floor area for the TrailMaster X4 Tent is 80 square feet. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_1.md)"}},{"inputs":{"question":"What is the material for TrailBlaze Hiking Pants"},"outputs":{"output":"The TrailBlaze Hiking Pants are made of high-quality nylon fabric. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"What color does TrailBlaze Hiking Pants come in"},"outputs":{"output":"The TrailBlaze Hiking Pants come in khaki color. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}},{"inputs":{"question":"Cant he warrenty for TrailBlaze pants be transfered?"},"outputs":{"output":"No, the warranty for the TrailBlaze Hiking Pants is non-transferable and applies only to the original purchaser of the product. (Source: azureml:\/\/locations\/eastus\/workspaces\/41f8d98d-f5e7-4bb3-8f74-b1f6a33acb5b\/data\/vector-index-input-1697916790707\/versions\/1\/product_info_10.md)"}}]}],"contentrange":"bytes 0-3147/3148","correlationid":"89839ad3-8a0c-4be8-9290-2e0ea67f9576","xrequestid":"89839ad3-8a0c-4be8-9290-2e0ea67f9576","modelversion":"default","collectdatatype":"pandas.core.frame.DataFrame","agent":"azureml-ai-monitoring/0.1.0b3"} \ No newline at end of file diff --git a/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py b/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py index f9601f7be1..fecf1017ed 100644 --- a/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py +++ b/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py @@ -3,10 +3,13 @@ """test class for mdc preprocessor.""" +from pyspark.sql import SparkSession +from pyspark.sql.types import StructField, StringType, DoubleType, LongType, MapType import pytest from unittest.mock import Mock import fsspec import shutil +import json import os import sys import spark_mltable # noqa, to enable spark.read.mltable @@ -14,6 +17,7 @@ from pandas.testing import assert_frame_equal from model_data_collector_preprocessor.run import ( _raw_mdc_uri_folder_to_preprocessed_spark_df, + _extract_data_and_correlation_id, mdc_preprocessor, _convert_to_azureml_long_form, _get_datastore_from_input_path, @@ -26,6 +30,14 @@ def mdc_preprocessor_test_setup(): original_work_dir = os.getcwd() momo_work_dir = os.path.abspath(f"{os.path.dirname(__file__)}/../..") os.chdir(momo_work_dir) # change working directory to root of the assets/model_monitoring_components + python_path = sys.executable + os.environ["PYSPARK_PYTHON"] = python_path + print("PYSPARK_PYTHON", os.environ.get("PYSPARK_PYTHON", "NA")) + module_path = f"{os.getcwd()}/src" + old_python_path = os.environ.get("PYTHONPATH", None) + old_python_path = f"{old_python_path};" if old_python_path else "" + os.environ["PYTHONPATH"] = f"{old_python_path}{module_path}" + print("PYTHONPATH:", os.environ.get("PYTHONPATH", "NA")) yield os.chdir(original_work_dir) # change working directory back to original @@ -54,14 +66,6 @@ def test_uri_folder_to_spark_df(self, mdc_preprocessor_test_setup, """Test uri_folder_to_spark_df().""" print("testing test_uri_folder_to_spark_df...") print("working dir:", os.getcwd()) - python_path = sys.executable - os.environ["PYSPARK_PYTHON"] = python_path - print("PYSPARK_PYTHON", os.environ.get("PYSPARK_PYTHON", "NA")) - module_path = f"{os.getcwd()}/src" - old_python_path = os.environ.get("PYTHONPATH", None) - old_python_path = f"{old_python_path};" if old_python_path else "" - os.environ["PYTHONPATH"] = f"{old_python_path}{module_path}" - print("PYTHONPATH:", os.environ.get("PYTHONPATH", "NA")) fs = fsspec.filesystem("file") tests_path = os.path.abspath(f"{os.path.dirname(__file__)}/../../tests") @@ -81,9 +85,9 @@ def test_uri_folder_to_spark_df(self, mdc_preprocessor_test_setup, pdf_actual = sdf.toPandas() pdf_expected = pd.DataFrame({ - 'sepal_length': [1, 2, 3, 1], + 'sepal_length': [1, 2, 3, 1.5], 'sepal_width': [2.3, 3.2, 3.4, 1.0], - 'petal_length': [2, 3, 3, 4], + 'petal_length': [2, 3, 3.2, 4], 'petal_width': [1.3, 1.5, 1.8, 1.6] }) if extract_correlation_id: @@ -101,6 +105,41 @@ def test_uri_folder_to_spark_df(self, mdc_preprocessor_test_setup, assert_frame_equal(pdf_actual, pdf_expected) + @pytest.mark.skip(reason="can't set PYTHONPATH for executor in remote run.") + @pytest.mark.parametrize( + "window_start_time, window_end_time, extract_correlation_id", + [ + # chat history + ("2023-10-30T16:00:00", "2023-10-30T17:00:00", False), + ("2023-10-30T16:00:00", "2023-10-30T17:00:00", True), + # ("2023-10-24T22:00:00", "2023-10-24T23:00:00", False), + # ("2023-10-24T22:00:00", "2023-10-24T23:00:00", True), + ] + ) + def test_uri_folder_to_spark_df_with_chat_history( + self, mdc_preprocessor_test_setup, + window_start_time, window_end_time, extract_correlation_id): + """Test uri_folder_to_spark_df() with chat_history column.""" + print("testing test_uri_folder_to_spark_df...") + print("working dir:", os.getcwd()) + + fs = fsspec.filesystem("file") + tests_path = os.path.abspath(f"{os.path.dirname(__file__)}/../../tests") + preprocessed_output = f"{tests_path}/unit/preprocessed_mdc_data" + shutil.rmtree(f"{preprocessed_output}temp", True) + + sdf = _raw_mdc_uri_folder_to_preprocessed_spark_df( + window_start_time, + window_end_time, + f"{tests_path}/unit/raw_mdc_data/", + preprocessed_output, + extract_correlation_id, + fs, + ) + print("preprocessed dataframe:") + sdf.show(truncate=False) + # todo: assert dataframe content + @pytest.mark.skip(reason="spark write is not ready in local") def test_mdc_preprocessor(self, mdc_preprocessor_test_setup): """Test mdc_preprocessor().""" @@ -118,6 +157,166 @@ def test_mdc_preprocessor(self, mdc_preprocessor_test_setup): fs, ) + @pytest.mark.skip(reason="can't set PYTHONPATH for executor in remote run.") + @pytest.mark.parametrize( + "data, expected_pdf, expected_fields", + [ + # single input in each row + ( + [ + [json.dumps([{"f0": "v0", "f1": 1, "f2": 2}]), "cid0"], + [json.dumps([{"f0": "v1", "f1": 1.2, "f2": 3}]), "cid1"], + [json.dumps([{"f0": "v2", "f1": 2.3, "f2": 4}]), "cid2"], + ], + pd.DataFrame([ + {"f0": "v0", "f1": 1.0, "f2": 2, "correlationid": "cid0_0"}, + {"f0": "v1", "f1": 1.2, "f2": 3, "correlationid": "cid1_0"}, + {"f0": "v2", "f1": 2.3, "f2": 4, "correlationid": "cid2_0"}, + ]), + [ + StructField("f0", StringType()), StructField("f1", DoubleType()), StructField("f2", LongType()), + # StructField("correlationid", StringType(), False) + ] + ), + # multiple inputs in one row + ( + [ + [json.dumps([{"f0": "v0", "f1": 1, "f2": 2}, + {"f0": "v3", "f1": 1.5, "f2": 5}]), "cid0"], + [json.dumps([{"f0": "v1", "f1": 2, "f2": 3}]), "cid1"], + [json.dumps([{"f0": "v2", "f1": 3, "f2": 4}]), "cid2"], + ], + pd.DataFrame([ + {"f0": "v0", "f1": 1.0, "f2": 2, "correlationid": "cid0_0"}, + {"f0": "v3", "f1": 1.5, "f2": 5, "correlationid": "cid0_1"}, + {"f0": "v1", "f1": 2, "f2": 3, "correlationid": "cid1_0"}, + {"f0": "v2", "f1": 3, "f2": 4, "correlationid": "cid2_0"}, + ]), + [ + StructField("f0", StringType()), StructField("f1", DoubleType()), StructField("f2", LongType()), + # StructField("correlationid", StringType(), False) + ] + ), + # struct fields + ( + [ + [json.dumps([{"simple_field": "v0", "struct_field": {"f0": "t0", "f1": "u0", "f2": "w0"}}]), "cid0"], # noqa + [json.dumps([{"simple_field": "v1", "struct_field": {"f0": "t1", "f1": "u1"}}, + {"simple_field": "v2", "struct_field": {"f0": "t2", "f1": "u2", "f2": "w2"}}]), "cid1"], # noqa + [json.dumps([{"simple_field": "v3", "struct_field": {"f0": "t3", "f2": "w3"}}]), "cid2"], # noqa + ], + pd.DataFrame([ + {"simple_field": "v0", "struct_field": {"f0": "t0", "f1": "u0", "f2": "w0"}, "correlationid": "cid0_0"}, # noqa + {"simple_field": "v1", "struct_field": {"f0": "t1", "f1": "u1"}, "correlationid": "cid1_0"}, # noqa + {"simple_field": "v2", "struct_field": {"f0": "t2", "f1": "u2", "f2": "w2"}, "correlationid": "cid1_1"}, # noqa + {"simple_field": "v3", "struct_field": {"f0": "t3", "f2": "w3"}, "correlationid": "cid2_0"}, # noqa + ]), + [ + StructField("simple_field", StringType()), + StructField("struct_field", MapType(StringType(), StringType())), + # StructField("correlationid", StringType(), False) + ] + ), + # chat history + ( + [ + [ + json.dumps([{"question": "q0", "chat_history": []}]), + "cid0" + ], + [ + json.dumps([ + { + "question": "q1", + "chat_history": [ + { + "inputs": {"question": "q0"}, + "outputs": {"output": "o0"}, + } + ] + } + ]), + "cid1" + ], + [ + json.dumps([ + { + "question": "q2", + "chat_history": [ + { + "inputs": {"question": "q0"}, + "outputs": {"output": "o0"}, + }, + { + "inputs": {"question": "q1"}, + "outputs": {"output": "o1"}, + } + ] + } + ]), + "cid2" + ], + ], + pd.DataFrame([ + {"question": "q0", "chat_history": [], "correlationid": "cid0_0"}, + { + "question": "q1", + "chat_history": [ + { + "inputs": {"question": "q0"}, + "outputs": {"output": "o0"}, + } + ], + "correlationid": "cid1_0" + }, + { + "question": "q2", + "chat_history": [ + { + "inputs": {"question": "q0"}, + "outputs": {"output": "o0"}, + }, + { + "inputs": {"question": "q1"}, + "outputs": {"output": "o1"}, + } + ], + "correlationid": "cid2_0" + } + ]), + [ + StructField("question", StringType()), + # StructField('chat_history', + # ArrayType(MapType(StringType(), MapType(StringType(), StringType())))), + # StructField("correlationid", StringType(), False) + ] + ) + ] + ) + def test_extract_data_and_correlation_id(self, mdc_preprocessor_test_setup, + data, expected_pdf, expected_fields): + """Test _extract_data_and_correlation_id().""" + spark = SparkSession.builder.appName("test_extract_data_and_correlation_id").getOrCreate() + expected_pdf.drop(columns=["chat_history"], inplace=True) + extract_correlation_ids = [True, False] + for extract_correlation_id in extract_correlation_ids: + in_df = spark.createDataFrame(data, ["data", "correlationid"]) + out_df = _extract_data_and_correlation_id(in_df, extract_correlation_id) + out_df.show(truncate=False) + out_df.printSchema() + fields = out_df.schema.fields + for field in expected_fields: + assert field in fields + expected_pdf_ = expected_pdf + if extract_correlation_id: + assert StructField("correlationid", StringType(), False) in fields + else: + expected_pdf_ = expected_pdf.drop(columns=["correlationid"], inplace=False) + actual_pdf = out_df.toPandas() + assert_frame_equal(actual_pdf, expected_pdf_) + + # assert False + @pytest.mark.parametrize( "url_str, converted", [