Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix evaluate metrics empty breached data #3173

Merged
merged 3 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
)
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import (
StringType,
DoubleType
)


# For the list of metrics, the users expect the value should be greater than the threshold,
Expand Down Expand Up @@ -72,13 +76,15 @@ def _clean_metrics_df(metrics_df: DataFrame) -> DataFrame:
F.col(SIGNAL_METRICS_METRIC_VALUE).isNotNull()
)

is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter(
F.col(SIGNAL_METRICS_THRESHOLD_VALUE) != F.lit("")
)
if is_not_nan_metrics_threshold_df.schema[SIGNAL_METRICS_THRESHOLD_VALUE].dataType == StringType():
is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter(
F.col(SIGNAL_METRICS_THRESHOLD_VALUE) != F.lit("")
)

is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter(
F.col(SIGNAL_METRICS_METRIC_VALUE) != F.lit("")
)
if is_not_nan_metrics_threshold_df.schema[SIGNAL_METRICS_METRIC_VALUE].dataType == StringType():
is_not_nan_metrics_threshold_df = is_not_nan_metrics_threshold_df.filter(
F.col(SIGNAL_METRICS_METRIC_VALUE) != F.lit("")
)

return is_not_nan_metrics_threshold_df

Expand All @@ -101,6 +107,11 @@ def evaluate_metrics_threshold(

def calculate_metrics_breach(metrics_threshold_df: DataFrame):
"""Calculate the breached metrics by the given thresholds."""
metrics_threshold_df = metrics_threshold_df.withColumn(SIGNAL_METRICS_THRESHOLD_VALUE,
F.col(SIGNAL_METRICS_THRESHOLD_VALUE).cast(DoubleType()))
metrics_threshold_df = metrics_threshold_df.withColumn(SIGNAL_METRICS_METRIC_VALUE,
F.col(SIGNAL_METRICS_METRIC_VALUE).cast(DoubleType()))

metrics_threshold_breached_df = metrics_threshold_df.where(
(F.col(SIGNAL_METRICS_METRIC_NAME).isin(Metric_Value_Should_Greater_Than_Threshold) &
(F.col(SIGNAL_METRICS_METRIC_VALUE) < F.col(SIGNAL_METRICS_THRESHOLD_VALUE))) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@
]
metrics_not_breached_df = create_pyspark_dataframe(metrics_not_breached, columns)

metrics_str = [
(TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, "0.367", "0.5"),
(PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, "0.367", "0.5"),
(NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, "0.367", "0.5"),
(AVERAGE_COHERENCE_SCORE_METRIC_NAME, "3.0", "20.0"),
(AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, "3.0", "5.0"),
(AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "100"),
("dropped_metric", "value", "threshold"),
("dropped_metric_2", "three", "five"),
]
metrics_str_df = create_pyspark_dataframe(metrics_str, columns)

expected_breached_data = [
(TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.367, 0.5),
(PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.367, 0.5),
(NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.367, 0.5),
(AVERAGE_COHERENCE_SCORE_METRIC_NAME, 3.0, 20.0),
(AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, 3.0, 5.0),
(AVERAGE_FLUENCY_SCORE_METRIC_NAME, 3.0, 100.0),
]
expected_breached_df = create_pyspark_dataframe(expected_breached_data, columns)

null_data = [
(TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, None, None),
(PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.8, None),
Expand All @@ -85,26 +107,52 @@
]
test_null_df = create_pyspark_dataframe(null_data, columns)

# For this setup, the metric_value and threshold_value columns are StringType()
valid_clean_data = [
(TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.8, 0.5),
(PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.8, 0.9),
(NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.5, 0.5),
# this is not a likely case but test non-empty strings
# The metrics value column in gsq metrics table is string type becasue there are null/empty values
(AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", '3.0'),
]
test_clean_df = create_pyspark_dataframe(valid_clean_data, columns)
# create empty dataframe
spark = SparkSession.builder.getOrCreate()
emptyRDD = spark.sparkContext.emptyRDD()

clean_str_data = [
(AVERAGE_COHERENCE_SCORE_METRIC_NAME, None, "5.0"),
(AVERAGE_GROUNDEDNESS_SCORE_METRIC_NAME, "3.0", ""),
(AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "threshold"),
(AVERAGE_SIMILARITY_SCORE_METRIC_NAME, "5.0", "5.0"),
(AVERAGE_RELEVANCE_SCORE_METRIC_NAME, "3.0", "4.0"),
]
clean_str_df = create_pyspark_dataframe(clean_str_data, columns)

expected_clean_str_data = [
(AVERAGE_FLUENCY_SCORE_METRIC_NAME, "3.0", "threshold"),
(AVERAGE_SIMILARITY_SCORE_METRIC_NAME, "5.0", "5.0"),
(AVERAGE_RELEVANCE_SCORE_METRIC_NAME, "3.0", "4.0"),
]
expected_clean_str_df = create_pyspark_dataframe(expected_clean_str_data, columns)

clean_double_data = [
(TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST_METRIC_NAME, 0.367, 0.5),
(PEARSONS_CHI_SQUARED_TEST_METRIC_NAME, 0.367, 0.5),
(NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN_METRIC_NAME, 0.367, 0.5),
(AGGREGATED_COHERENCE_PASS_RATE_METRIC_NAME, 3.0, 5.0),
]
clean_double_data_df = create_pyspark_dataframe(clean_double_data, columns)


@pytest.mark.unit
class TestEvaluateMetricsThreshold:
"""Test class for evaluate metrics threshold component."""

@pytest.mark.parametrize("metrics_df, breached_metrics_df",
[(metrics_breached_df, metrics_breached_df),
(metrics_not_breached_df, emptyRDD)])
(metrics_not_breached_df, emptyRDD),
(metrics_str_df, expected_breached_df)])
def test_calculate_metrics_breach(
self,
metrics_df,
Expand All @@ -117,7 +165,9 @@ def test_calculate_metrics_breach(

@pytest.mark.parametrize("metrics_df, expected_metrics_df",
[(test_null_df, emptyRDD),
(test_clean_df, test_clean_df)])
(test_clean_df, test_clean_df),
(clean_str_df, expected_clean_str_df),
(clean_double_data_df, clean_double_data_df)])
def test_clean_metrics_df(self, metrics_df, expected_metrics_df):
"""Test clean metrics dataframe."""
actual_cleaned_metrics_df = _clean_metrics_df(metrics_df)
Expand Down
Loading