Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Some values are copied over from previous runs #43

Open
holdenk opened this issue Oct 3, 2023 · 0 comments
Open

[BUG] Some values are copied over from previous runs #43

holdenk opened this issue Oct 3, 2023 · 0 comments
Labels
bug Something isn't working

Comments

@holdenk
Copy link
Contributor

holdenk commented Oct 3, 2023

Describe the bug
A clear and concise description of what the bug is.

To Reproduce

Run

from pyspark import SparkFiles
from pyspark.sql import *
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter

spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

#tag::global_setup[]
se_conf = {
    "se_notifications_enable_email": False,
    "se_notifications_email_smtp_host": "mailhost.example.com",
    "se_notifications_email_smtp_port": 25,
    "se_notifications_email_from": "[email protected]",
    "se_notifications_email_subject": "spark expectations - data quality - notifications",
    "se_notifications_on_fail": True,
    "se_notifications_on_error_drop_exceeds_threshold_breach": True,
    "se_notifications_on_error_drop_threshold": 15,
}
#end::gloabl_setup[]


#tag::setup_and_load[]
from spark_expectations.config.user_config import Constants as user_config

spark.sql("DROP TABLE IF EXISTS local.magic_validation")
spark.sql("DROP TABLE IF EXISTS local.dq_stats")
spark.sql(
    """
create table local.magic_validation (
    product_id STRING,
    table_name STRING,
    rule_type STRING,
    rule STRING,
    column_name STRING,
    expectation STRING,
    action_if_failed STRING,
    tag STRING,
    description STRING,
    enable_for_source_dq_validation BOOLEAN,
    enable_for_target_dq_validation BOOLEAN,
    is_active BOOLEAN,
    enable_error_drop_alert BOOLEAN,
    error_drop_threshold INT
)"""
)
# Reminder: addFile does not handle directories well.
rule_file = "spark_expectations_sample_rules.json"
sc.addFile(rule_file)
df = spark.read.json(SparkFiles.get(rule_file))
print(df)
df.write.option("byname", "true").mode("append").saveAsTable("local.magic_validation")
spark.read.table("local.magic_validation").show()

# Can be used to point to your desired metastore.
se_writer = WrappedDataFrameWriter().mode("append").format("iceberg")

rule_df = spark.sql("select * from local.magic_validation")

se: SparkExpectations = SparkExpectations(
    rules_df=rule_df, # See if we can replace this with the DF we wrote out.
    product_id="pay", # We will only apply rules matching this product id
    stats_table="local.dq_stats",
    stats_table_writer=se_writer,
    target_and_error_table_writer=se_writer,
    stats_streaming_options={user_config.se_enable_streaming: False},
)
#end::setup_and_load[]
rule_df.show(truncate=200)


#tag::run_validation_row[]
@se.with_expectations(
    user_conf=se_conf,
    write_to_table=False, # If set to true SE will write to the target table.
    target_and_error_table_writer=se_writer,
    # target_table is used to create the error table (e.g. here local.fake_table_name_error)
    # and filter the rules on top of the global product filter.
    target_table="local.fake_table_name",
)
def load_data():
    raw_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)
    uk_df = raw_df.select("CompanyNumber", "MaleBonusPercent", "FemaleBonuspercent")
    return uk_df


data = load_data()
#end::run_validation_row[]

#tag::run_validation_complex[]
@se.with_expectations(
    user_conf=se_conf,
    write_to_table=True, # If set to true SE will write to the target table.
    target_and_error_table_writer=se_writer,
    # target_table is used to create the error table (e.g. here local.fake_table_name_error)
    # and filter the rules on top of the global product filter.
    target_table="local.3rd_fake",
)
def load_data2():
    raw_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)
    uk_df = raw_df.select("CompanyNumber", "MaleBonusPercent", "FemaleBonuspercent")
    return uk_df


data = load_data2()
#end::run_validation_complex[]

spark.sql("SELECT table_name, error_percentage, * FROM local.dq_stats").show(truncate=300)

With the rules from

{"product_id": "pay", "table_name": "local.fake_table_name", "rule_type": "row_dq", "rule": "bonus_checker", "column_name": "MaleBonusPercent", "expectation": "MaleBonusPercent > FemaleBonusPercent", "action_if_failed": "drop", "tag": "", "description": "Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.)", "enable_for_source_dq_validation": true, "enable_for_target_dq_validation": false, "is_active": true, "enable_error_drop_alert": true, "error_drop_threshold": 1}
{"product_id": "pay", "table_name": "local.3rd_fake", "rule_type": "query_dq", "rule": "history", "column_name": "Bloop", "expectation": "(select count(*) from 3rd_fake_view) < (select input_count from local.dq_stats WHERE table_name='local.3rd_fake')", "action_if_failed": "fail", "tag": "", "description": "We should always have more records than before", "enable_for_source_dq_validation": false, "enable_for_target_dq_validation": true, "is_active": true, "enable_error_drop_alert": true, "error_drop_threshold": 1}

Have the description for the 2nd rule copied from the first

(e.g.

|local.fake_table_name| 67.51| pay|local.fake_table_name| 15267| 10306| 4961| 32.49| 32.49| 67.51| null| null| null| null|[{rule_type -> row_dq, description -> Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.), rule -> bonus_checker, failed_row_count -> 10306, tag -> , action_if_failed -> drop}]|[{rule_type -> row_dq, rule_name -> bonus_checker, error_drop_threshold -> 1, description -> Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.), error_drop_percentage -> 67.51, action_if_failed -> drop}]| {final_query_dq -> Skipped, source_agg_dq -> Skipped, final_agg_dq -> Skipped, source_query_dq -> Skipped, run_status -> Passed, row_dq -> Passed}|{row_dq_run_time -> 1.7, final_agg_dq_run_time -> 0.0, run_time -> 2.2, source_query_dq_run_time -> 0.0, source_agg_dq_run_time -> 0.0, final_query_dq_run_time -> 0.0}|{rules -> {num_dq_rules -> 1, num_row_dq_rules -> 1}, query_dq_rules -> {num_final_query_dq_rules -> 0, num_source_query_dq_rules -> 0, num_query_dq_rules -> 0}, agg_dq_rules -> {num_source_agg_dq_rules -> 0, num_agg_dq_rules -> 0, num_final_agg_dq_rules -> 0}}|pay_4d5010a0-6192-11ee-8533-afb51606381c| 2023-10-03| 2023-10-03 09:12:28|
)

Expected behavior

Have the rules description be populated not the previous rule.

Screenshots

See 1:46 of https://www.youtube.com/watch?v=bNvvPKv-dmQ

Desktop (please complete the following information):

  • OS: Linux
  • Browser N/A
  • Version 1.0

Additional context
Add any other context about the problem here.

@holdenk holdenk added the bug Something isn't working label Oct 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant