Skip to content

Commit

Permalink
feat: adding data_validator class to pysparky
Browse files Browse the repository at this point in the history
  • Loading branch information
cenzwong committed Oct 21, 2024
1 parent ed045f4 commit 1b734e5
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 74 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"editor.rulers": [
{"column": 88}
]
}
Binary file modified dist/pysparky-0.1.0-py3-none-any.whl
Binary file not shown.
Binary file modified dist/pysparky-0.1.0.tar.gz
Binary file not shown.
1 change: 1 addition & 0 deletions docs/data_validator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: pysparky.data_validator
220 changes: 148 additions & 72 deletions example/data_checking.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
"name": "stderr",
"output_type": "stream",
"text": [
"24/10/21 15:44:14 WARN Utils: Your hostname, codespaces-0aafae resolves to a loopback address: 127.0.0.1; using 10.0.10.95 instead (on interface eth0)\n",
"24/10/21 15:44:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
"24/10/21 21:28:44 WARN Utils: Your hostname, codespaces-0aafae resolves to a loopback address: 127.0.0.1; using 10.0.10.187 instead (on interface eth0)\n",
"24/10/21 21:28:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"24/10/21 15:44:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
"24/10/21 21:28:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
}
],
Expand Down Expand Up @@ -63,15 +63,7 @@
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"24/10/21 15:44:33 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\n"
]
}
],
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
Expand Down Expand Up @@ -118,69 +110,84 @@
},
{
"cell_type": "code",
"execution_count": 36,
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass, field\n",
"\n",
"\n",
"@dataclass\n",
"class CheckField:\n",
" check_name: str\n",
" criteria: list[int]\n",
" criteria_and: int = field(init=False)\n",
"\n",
" def __post_init__(self):\n",
" self.criteria_and = F_.condition_and(*enabler.ensure_list(self.criteria))\n",
"\n",
"\n",
"@dataclass\n",
"class Criteria:\n",
" checks: list[CheckField]\n",
"\n",
" @classmethod\n",
" def fromDict(cls, data: dict[str, list[int]]):\n",
" checks = [CheckField(name, criteria) for name, criteria in data.items()]\n",
" return cls(checks=checks) # Adjust sdf as needed\n",
"\n",
" @property\n",
" def criteria_to_query(self):\n",
" return {\n",
" check_field.check_name: check_field.criteria_and\n",
" for check_field in self.checks\n",
" }\n",
"\n",
" def check_matrix(self, sdf):\n",
" self.sdf = sdf\n",
" return sdf.withColumns(self.criteria_to_query)\n",
"from pysparky.data_validator import DataValidator, ValidationRule"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| first_name|last_name| address| region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| John| Doe| 120 jefferson st.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Jack| McGinnis| 220 hobo Av.| Phila| PA| 9119| true| true| true| true| false| true|\n",
"| John \"Da Man\"| Repici| 120 Jefferson St.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Stephen| Tyler|7452 Terrace \"At ...| SomeTown| SD| 91234| true| true| true| true| true| true|\n",
"| NaN| Blankman| NaN| SomeTown| SD| 298| true| true| true| true| false| true|\n",
"|Joan \"the bone\", ...| Jet| 9th, at Terrace plc|Desert City| CO| 123| true| true| true| true| true| true|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"\n",
"+-------------+---------+-----------------+---------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| first_name|last_name| address| region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|\n",
"+-------------+---------+-----------------+---------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| John| Doe|120 jefferson st.|Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Jack| McGinnis| 220 hobo Av.| Phila| PA| 9119| true| true| true| true| false| true|\n",
"|John \"Da Man\"| Repici|120 Jefferson St.|Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| NaN| Blankman| NaN| SomeTown| SD| 298| true| true| true| true| false| true|\n",
"+-------------+---------+-----------------+---------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"\n",
"+--------------------+---------+--------------------+-----------+----+--------+\n",
"| first_name|last_name| address| region|code|postcode|\n",
"+--------------------+---------+--------------------+-----------+----+--------+\n",
"| Stephen| Tyler|7452 Terrace \"At ...| SomeTown| SD| 91234|\n",
"|Joan \"the bone\", ...| Jet| 9th, at Terrace plc|Desert City| CO| 123|\n",
"+--------------------+---------+--------------------+-----------+----+--------+\n",
"\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| first_name|last_name| address| region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| John| Doe| 120 jefferson st.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Jack| McGinnis| 220 hobo Av.| Phila| PA| 9119| true| true| true| true| false| true|\n",
"| John \"Da Man\"| Repici| 120 Jefferson St.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Stephen| Tyler|7452 Terrace \"At ...| SomeTown| SD| 91234| true| true| true| true| true| true|\n",
"| NaN| Blankman| NaN| SomeTown| SD| 298| true| true| true| true| false| true|\n",
"|Joan \"the bone\", ...| Jet| 9th, at Terrace plc|Desert City| CO| 123| true| true| true| true| true| true|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"\n"
]
}
],
"source": [
"ValidationRules = [\n",
" ValidationRule(\"first_name_check\", F_.printable_only(\"first_name\")),\n",
" ValidationRule(\"last_name_check\", F_.printable_only(\"last_name\")),\n",
" ValidationRule(\"address_check\", F_.printable_only(\"address\")),\n",
" ValidationRule(\"region_check\", F_.printable_only(\"region\")),\n",
" ValidationRule(\"code_check\", [F_.two_character_only(\"code\")]),\n",
" ValidationRule(\"postcode_check\", F_.printable_only(\"postcode\")),\n",
"]\n",
"\n",
" def false_data(self, sdf):\n",
" return te.filters(\n",
" self.check_matrix(sdf),\n",
" [\n",
" # Either cases is False\n",
" F.col(column_name) == False\n",
" for column_name in self.criteria_to_query.keys()\n",
" ],\n",
" operator_=\"or\",\n",
" )\n",
"validator = DataValidator(ValidationRules)\n",
"validator.apply_conditions(data_sdf).show()\n",
"validator.filter_invalid(data_sdf).show()\n",
"validator.filter_valid(data_sdf).select(data_sdf.columns).show()\n",
"\n",
" def positive_data(self, sdf):\n",
" return te.filters(\n",
" self.check_matrix(sdf),\n",
" [\n",
" # All cases is True\n",
" F.col(column_name) == True\n",
" for column_name in self.criteria_to_query.keys()\n",
" ],\n",
" operator_=\"and\",\n",
" )"
"data_sdf.withColumns(validator.query_map).show()"
]
},
{
"cell_type": "code",
"execution_count": 41,
"execution_count": 8,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -213,12 +220,30 @@
"| Stephen| Tyler|7452 Terrace \"At ...| SomeTown| SD| 91234|\n",
"|Joan \"the bone\", ...| Jet| 9th, at Terrace plc|Desert City| CO| 123|\n",
"+--------------------+---------+--------------------+-----------+----+--------+\n",
"\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| first_name|last_name| address| region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"| John| Doe| 120 jefferson st.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Jack| McGinnis| 220 hobo Av.| Phila| PA| 9119| true| true| true| true| false| true|\n",
"| John \"Da Man\"| Repici| 120 Jefferson St.| Riverside| NJ| 8075| true| true| true| true| false| true|\n",
"| Stephen| Tyler|7452 Terrace \"At ...| SomeTown| SD| 91234| true| true| true| true| true| true|\n",
"| NaN| Blankman| NaN| SomeTown| SD| 298| true| true| true| true| false| true|\n",
"|Joan \"the bone\", ...| Jet| 9th, at Terrace plc|Desert City| CO| 123| true| true| true| true| true| true|\n",
"+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"24/10/21 21:29:03 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\n"
]
}
],
"source": [
"criteria = {\n",
"conditions = {\n",
" \"first_name_check\": F_.printable_only(\"first_name\"),\n",
" \"last_name_check\": F_.printable_only(\"last_name\"),\n",
" \"address_check\": F_.printable_only(\"address\"),\n",
Expand All @@ -227,13 +252,64 @@
" \"postcode_check\": F_.printable_only(\"postcode\"),\n",
"}\n",
"\n",
"# print(Criteria.fromDict(criteria).criteria_to_query)\n",
"Criteria.fromDict(criteria).check_matrix(data_sdf).show()\n",
"Criteria.fromDict(criteria).false_data(data_sdf).show()\n",
"Criteria.fromDict(criteria).positive_data(data_sdf).select(data_sdf.columns).show()\n",
"\n",
"# data_sdf.withColumns(Criteria.fromDict(criteria).criteria_to_query).show()"
"validator = DataValidator.from_dict(conditions)\n",
"validator.apply_conditions(data_sdf).show()\n",
"validator.filter_invalid(data_sdf).show()\n",
"validator.filter_valid(data_sdf).select(data_sdf.columns).show()\n",
"\n",
"data_sdf.withColumns(validator.query_map).show()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"data = [(1, \"valid\"), (2, \"invalid\"), (3, \"valid\")]\n",
"sdf = spark.createDataFrame(data, [\"id\", \"status\"])\n",
"\n",
"# Define validation rules\n",
"rules_dict = {\"status_check\": [F.col(\"status\") == \"valid\"]}\n",
"validator = DataValidator.from_dict(rules_dict)\n",
"\n",
"# Check query map\n",
"query_map = validator.query_map\n",
"result_data = sdf.withColumns(query_map).collect()\n",
"expected_data = [(1, \"valid\", True), (2, \"invalid\", False), (3, \"valid\", True)]\n",
"\n",
"assert result_data == expected_data\n",
"# assert \"status_check\" in query_map\n",
"# assert query_map[\"status_check\"] == (F.col(\"status\") == \"valid\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=1, status='valid', status_check=True),\n",
" Row(id=2, status='invalid', status_check=False),\n",
" Row(id=3, status='valid', status_check=True)]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
6 changes: 6 additions & 0 deletions lint.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
echo "=== Black ==="
black .
echo "=== isort ==="
python -m isort .
echo "=== pylint ==="
pylint pysparky
echo "=== mypy ==="
mypy .
echo "=== pytest ==="
pytest
echo "=== build. ==="
python -m build
3 changes: 2 additions & 1 deletion pysparky.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ LICENSE
README.md
pyproject.toml
pysparky/__init__.py
pysparky/data_validator.py
pysparky/debug.py
pysparky/decorator.py
pysparky/enabler.py
pysparky/quality.py
pysparky/reader_options.py
pysparky/schema_ext.py
pysparky/spark_ext.py
pysparky/typing.py
Expand All @@ -23,6 +23,7 @@ pysparky/functions/math_.py
pysparky/transformations/__init__.py
pysparky/transformations/dedup.py
pysparky/transformations/general.py
tests/test_data_validator.py
tests/test_debug.py
tests/test_decorator.py
tests/test_enabler.py
Expand Down
Loading

0 comments on commit 1b734e5

Please sign in to comment.