diff --git a/assets/model_monitoring/components/src/model_monitor_evaluate_metrics_threshold/evaluate_metrics_threshold.py b/assets/model_monitoring/components/src/model_monitor_evaluate_metrics_threshold/evaluate_metrics_threshold.py index 0e31ef98c6..f039d10c72 100644 --- a/assets/model_monitoring/components/src/model_monitor_evaluate_metrics_threshold/evaluate_metrics_threshold.py +++ b/assets/model_monitoring/components/src/model_monitor_evaluate_metrics_threshold/evaluate_metrics_threshold.py @@ -25,6 +25,10 @@ ) from pyspark.sql import DataFrame import pyspark.sql.functions as F +from pyspark.sql.types import ( + StringType, + DoubleType +) # For the list of metrics, the users expect the value should be greater than the threshold, @@ -72,13 +76,15 @@ def _clean_metrics_df(metrics_df: DataFrame) -> DataFrame: F.col(SIGNAL_METRICS_METRIC_VALUE).isNotNull() ) - is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter( - F.col(SIGNAL_METRICS_THRESHOLD_VALUE) != F.lit("") - ) + if is_not_nan_metrics_threshold_df.schema[SIGNAL_METRICS_THRESHOLD_VALUE].dataType == StringType(): + is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter( + F.col(SIGNAL_METRICS_THRESHOLD_VALUE) != F.lit("") + ) - is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter( - F.col(SIGNAL_METRICS_METRIC_VALUE) != F.lit("") - ) + if is_not_nan_metrics_threshold_df.schema[SIGNAL_METRICS_METRIC_VALUE].dataType == StringType(): + is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter( + F.col(SIGNAL_METRICS_METRIC_VALUE) != F.lit("") + ) return is_not_nan_metrics_threshold_df @@ -101,6 +107,11 @@ def evaluate_metrics_threshold( def calculate_metrics_breach(metrics_threshold_df: DataFrame): """Calculate the breached metrics by the given thresholds.""" + metrics_threshold_df = metrics_threshold_df.withColumn(SIGNAL_METRICS_THRESHOLD_VALUE, + F.col(SIGNAL_METRICS_THRESHOLD_VALUE).cast(DoubleType())) + metrics_threshold_df = metrics_threshold_df.withColumn(SIGNAL_METRICS_METRIC_VALUE, + F.col(SIGNAL_METRICS_METRIC_VALUE).cast(DoubleType())) + metrics_threshold_breached_df = metrics_threshold_df.where( (F.col(SIGNAL_METRICS_METRIC_NAME).isin(Metric_Value_Should_Greater_Than_Threshold) & (F.col(SIGNAL_METRICS_METRIC_VALUE) < F.col(SIGNAL_METRICS_THRESHOLD_VALUE))) | diff --git a/assets/model_monitoring/components/tests/unit/test_evaluate_metrics_threshold.py b/assets/model_monitoring/components/tests/unit/test_evaluate_metrics_threshold.py index f72fde39e0..f5ffb9cc20 100644 --- a/assets/model_monitoring/components/tests/unit/test_evaluate_metrics_threshold.py +++ b/assets/model_monitoring/components/tests/unit/test_evaluate_metrics_threshold.py @@ -75,6 +75,28 @@ ] metrics_not_breached_df = create_pyspark_dataframe(metrics_not_breached, columns) +metrics_str = [ + (TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, "0.367", "0.5"), + (PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, "0.367", "0.5"), + (NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, "0.367", "0.5"), + (AVERAGE_COHERENCE_SCORE_METRIC_NAME, "3.0", "20.0"), + (AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, "3.0", "5.0"), + (AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "100"), + ("dropped_metric", "value", "threshold"), + ("dropped_metric_2", "three", "five"), + ] +metrics_str_df = create_pyspark_dataframe(metrics_str, columns) + +expected_breached_data = [ + (TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.367, 0.5), + (PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.367, 0.5), + (NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.367, 0.5), + (AVERAGE_COHERENCE_SCORE_METRIC_NAME, 3.0, 20.0), + (AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, 3.0, 5.0), + (AVERAGE_FLUENCY_SCORE_METRIC_NAME, 3.0, 100.0), + ] +expected_breached_df = create_pyspark_dataframe(expected_breached_data, columns) + null_data = [ (TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, None, None), (PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.8, None), @@ -85,11 +107,12 @@ ] test_null_df = create_pyspark_dataframe(null_data, columns) +# For this setup, the metric_value and threshold_value columns are StringType() valid_clean_data = [ (TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.8, 0.5), (PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.8, 0.9), (NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.5, 0.5), - # this is not a likely case but test non-empty strings + # The metrics value column in gsq metrics table is string type becasue there are null/empty values (AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", '3.0'), ] test_clean_df = create_pyspark_dataframe(valid_clean_data, columns) @@ -97,6 +120,30 @@ spark = SparkSession.builder.getOrCreate() emptyRDD = spark.sparkContext.emptyRDD() +clean_str_data = [ + (AVERAGE_COHERENCE_SCORE_METRIC_NAME, None, "5.0"), + (AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, "3.0", ""), + (AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "threshold"), + (AVERAGE_SIMILARITY_SCORE_METRIC_NAME, "5.0", "5.0"), + (AVERAGE_RELEVANCE_SCORE_METRIC_NAME, "3.0", "4.0"), +] +clean_str_df = create_pyspark_dataframe(clean_str_data, columns) + +expected_clean_str_data = [ + (AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "threshold"), + (AVERAGE_SIMILARITY_SCORE_METRIC_NAME, "5.0", "5.0"), + (AVERAGE_RELEVANCE_SCORE_METRIC_NAME, "3.0", "4.0"), +] +expected_clean_str_df = create_pyspark_dataframe(expected_clean_str_data, columns) + +clean_double_data = [ + (TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.367, 0.5), + (PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.367, 0.5), + (NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.367, 0.5), + (AGGREGATED_COHERENCE_PASS_RATE_METRIC_NAME, 3.0, 5.0), +] +clean_double_data_df = create_pyspark_dataframe(clean_double_data, columns) + @pytest.mark.unit class TestEvaluateMetricsThreshold: @@ -104,7 +151,8 @@ class TestEvaluateMetricsThreshold: @pytest.mark.parametrize("metrics_df, breached_metrics_df", [(metrics_breached_df, metrics_breached_df), - (metrics_not_breached_df, emptyRDD)]) + (metrics_not_breached_df, emptyRDD), + (metrics_str_df, expected_breached_df)]) def test_calculate_metrics_breach( self, metrics_df, @@ -117,7 +165,9 @@ def test_calculate_metrics_breach( @pytest.mark.parametrize("metrics_df, expected_metrics_df", [(test_null_df, emptyRDD), - (test_clean_df, test_clean_df)]) + (test_clean_df, test_clean_df), + (clean_str_df, expected_clean_str_df), + (clean_double_data_df, clean_double_data_df)]) def test_clean_metrics_df(self, metrics_df, expected_metrics_df): """Test clean metrics dataframe.""" actual_cleaned_metrics_df = _clean_metrics_df(metrics_df)