Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support regular parquet in test statistics #330

Merged
merged 21 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions eogrow/utils/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections import defaultdict
from dataclasses import dataclass
from functools import partial
from typing import Any, Iterable, cast
from typing import Any, Iterable, Optional, cast

import fs
import geopandas as gpd
Expand Down Expand Up @@ -38,6 +38,7 @@ class StatCalcConfig:
unique_values_limit: int = 8
histogram_bin_num: int = 8
num_random_values: int = 8
parquet_epsg: Optional[int] = None # if set, tries to load parquet as a geoparquet


def compare_with_saved(stats: JsonDict, filename: str) -> DeepDiff:
Expand Down Expand Up @@ -75,7 +76,7 @@ def calculate_statistics(folder: str, config: StatCalcConfig) -> JsonDict:
for content in os.listdir(folder):
content_path = fs.path.combine(folder, content)

if os.path.isdir(content_path):
if os.path.isdir(content_path) and not content_path.endswith("parquet"):
fs_data_info = get_filesystem_data_info(OSFS("/"), content_path)
if fs_data_info.bbox is not None:
load_timestamps = fs_data_info.timestamps is not None
Expand All @@ -89,26 +90,33 @@ def calculate_statistics(folder: str, config: StatCalcConfig) -> JsonDict:
elif content_path.endswith(".npy"):
stats[content] = _calculate_numpy_stats(np.load(content_path, allow_pickle=True), config)
elif content_path.endswith((".geojson", ".gpkg")):
stats[content] = _calculate_vector_stats(gpd.read_file(content_path), config)
stats[content] = _calculate_vector_stats(gpd.read_file(content_path, engine="pyogrio"), config)
elif content_path.endswith(".parquet"):
try:
data = gpd.read_parquet(content_path)
except Exception:
data = _load_as_geoparquet(content_path)
stats[content] = _calculate_vector_stats(data, config)
stats[content] = _get_parquet_stats(content_path, config)
else:
stats[content] = None

return stats


def _load_as_geoparquet(path: str) -> gpd.GeoDataFrame:
data = pd.read_parquet(path)
if isinstance(data.geometry.iloc[0], str):
data.geometry = data.geometry.apply(wkt.loads)
elif isinstance(data.geometry.iloc[0], bytes):
data.geometry = data.geometry.apply(wkb.loads)
return gpd.GeoDataFrame(data, geometry="geometry", crs=data.utm_crs.iloc[0])
def _get_parquet_stats(content_path: str, config: StatCalcConfig) -> JsonDict:
try:
data = gpd.read_parquet(content_path)
return _calculate_vector_stats(data, config)
except Exception:
data = pd.read_parquet(content_path)
if "geometry" not in data:
return _calculate_parquet_stats(data, config)

loader = wkb.loads if isinstance(data.geometry.iloc[0], bytes) else wkt.loads
data.geometry = data.geometry.apply(loader)

if config.parquet_epsg:
parsed_data = gpd.GeoDataFrame(data, geometry="geometry", crs=config.parquet_epsg)
return _calculate_vector_stats(parsed_data, config)

data.geometry = data.geometry.apply(wkt.dumps)
return _calculate_parquet_stats(data, config)


def _calculate_eopatch_stats(eopatch: EOPatch, config: StatCalcConfig) -> JsonDict:
Expand Down Expand Up @@ -194,11 +202,12 @@ def _get_coords_sample(geom: Polygon | MultiPolygon | Any) -> list[tuple[float,
return [_rounder(point) for point in geom.exterior.coords[:10]] if isinstance(geom, Polygon) else None

stats = {
"columns": list(gdf),
"columns_and_dtypes": [list(item) for item in gdf.dtypes.astype(str).sort_index().items()],
"row_count": len(gdf),
"crs": str(gdf.crs),
"mean_area": _prepare_value(gdf.area.mean(), np.float64),
"total_bounds": [_prepare_value(x, dtype=np.float64) for x in gdf.total_bounds],
"agg_stats": _extract_dataframe_stats(gdf),
}

if len(gdf):
Expand All @@ -217,6 +226,23 @@ def _get_coords_sample(geom: Polygon | MultiPolygon | Any) -> list[tuple[float,
return stats


def _calculate_parquet_stats(data: pd.DataFrame, config: StatCalcConfig) -> JsonDict:
stats = {
"columns_and_dtypes": [list(item) for item in data.dtypes.astype(str).sort_index().items()],
"row_count": len(data),
"agg_stats": _extract_dataframe_stats(data),
}

if len(data):
subsample: pd.DataFrame = data.sample(min(len(data), config.num_random_values), random_state=42)
for col in subsample.select_dtypes(include="number").columns.values:
subsample[col] = subsample[col].apply(partial(_prepare_value, dtype=subsample[col].dtype))

subsample_json_string = subsample.to_json(orient="index", date_format="iso")
stats["random_rows"] = json.loads(subsample_json_string)
return stats


def _calculate_basic_stats(values: np.ndarray) -> dict[str, float]:
"""Randomly samples a small amount of points from the array (10% by default) to recalculate the statistics.
This introduces a 'positional instability' so that accidental mirroring or re-orderings are detected."""
Expand All @@ -230,6 +256,20 @@ def _calculate_basic_stats(values: np.ndarray) -> dict[str, float]:
}


def _extract_dataframe_stats(data: pd.DataFrame | gpd.GeoDataFrame) -> dict[str, Any] | None:
cols_num = data.select_dtypes(include="number").columns
if not len(cols_num):
return None

cols_f32 = data.columns[data.dtypes == "float32"]
data_stats = data[cols_num].describe().loc[["mean", "std", "min", "max"]]
for col in data_stats.columns:
_prepare_value_func = partial(_prepare_value, dtype=np.float32 if col in cols_f32 else np.float64)
data_stats[col] = data_stats[col].apply(_prepare_value_func)

return data_stats.to_dict()


def _get_random_values(raster: np.ndarray, config: StatCalcConfig) -> list[float]:
"""It randomly samples a few values from the array and marks their locations."""
rng = np.random.default_rng(0)
Expand Down Expand Up @@ -314,6 +354,7 @@ def compare_content(
folder_path: str | None,
stats_path: str,
*,
config: StatCalcConfig | None = None,
save_new_stats: bool = False,
) -> None:
"""Compares the results from a pipeline run with the saved statistics. Constructed to be coupled with `run_config`
Expand All @@ -325,8 +366,8 @@ def compare_content(
"""
if folder_path is None:
raise ValueError("The given path is None. The pipeline likely has no `output_folder_key` parameter.")

stats = calculate_statistics(folder_path, config=StatCalcConfig())
config = StatCalcConfig() if config is None else config
stats = calculate_statistics(folder_path, config=config)

if save_new_stats:
save_statistics(stats, stats_path)
Expand Down
56 changes: 54 additions & 2 deletions tests/test_stats/import_vector/import_vector.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,33 @@
"bbox": "BBox(((729480.0, 4390045.0), (732120.0, 4391255.0)), crs=CRS('32638'))",
"vector_timeless": {
"LULC_vector": {
"columns": ["LULC", "LULC_ID", "LULC_POLYGON_ID", "area", "geometry"],
"agg_stats": {
"LULC_ID": {
"max": 9.0,
"mean": 5.3737051793,
"min": 0.0,
"std": 3.4493061816
},
"LULC_POLYGON_ID": {
"max": 2149.0,
"mean": 982.9561753,
"min": 5.0,
"std": 494.43910402
},
"area": {
"max": 1445567.532,
"mean": 5226.2068038,
"min": 0.077909185493,
"std": 44121.719048
}
},
"columns_and_dtypes": [
["LULC", "object"],
["LULC_ID", "int64"],
["LULC_POLYGON_ID", "int64"],
["area", "float64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 2654.0304164,
"random_rows": {
Expand Down Expand Up @@ -165,7 +191,33 @@
"bbox": "BBox(((729480.0, 4391145.0), (732120.0, 4392355.0)), crs=CRS('32638'))",
"vector_timeless": {
"LULC_vector": {
"columns": ["LULC", "LULC_ID", "LULC_POLYGON_ID", "area", "geometry"],
"agg_stats": {
"LULC_ID": {
"max": 9.0,
"mean": 4.200729927,
"min": 0.0,
"std": 3.6711035527
},
"LULC_POLYGON_ID": {
"max": 2145.0,
"mean": 876.00547445,
"min": 5.0,
"std": 650.91998596
},
"area": {
"max": 440499.93262,
"mean": 9875.5129663,
"min": 0.43074247771,
"std": 30295.081901
}
},
"columns_and_dtypes": [
["LULC", "object"],
["LULC_ID", "int64"],
["LULC_POLYGON_ID", "int64"],
["area", "float64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 5832.1912169,
"random_rows": {
Expand Down
14 changes: 12 additions & 2 deletions tests/test_stats/import_vector/import_vector_temporal.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"bbox": "BBox(((729480.0, 4390045.0), (732120.0, 4391255.0)), crs=CRS('32638'))",
"vector": {
"crop": {
"columns": ["CROP_TYPE", "TIMESTAMP", "geometry"],
"agg_stats": null,
"columns_and_dtypes": [
["CROP_TYPE", "object"],
["TIMESTAMP", "datetime64[ns]"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 11132.309525,
"random_rows": {
Expand Down Expand Up @@ -45,7 +50,12 @@
"bbox": "BBox(((729480.0, 4391145.0), (732120.0, 4392355.0)), crs=CRS('32638'))",
"vector": {
"crop": {
"columns": ["CROP_TYPE", "TIMESTAMP", "geometry"],
"agg_stats": null,
"columns_and_dtypes": [
["CROP_TYPE", "object"],
["TIMESTAMP", "datetime64[ns]"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 7090.734867,
"random_rows": {
Expand Down
88 changes: 84 additions & 4 deletions tests/test_stats/rasterize/rasterize_feature_with_resolution.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,21 @@
},
"vector_timeless": {
"CROPS_VECTOR": {
"columns": ["CROP_TYPE", "POLYGON_ID", "DATE", "ORIGIN", "geometry"],
"agg_stats": {
"POLYGON_ID": {
"max": 327.0,
"mean": 222.13793103,
"min": 11.0,
"std": 108.78639757
}
},
"columns_and_dtypes": [
["CROP_TYPE", "object"],
["DATE", "object"],
["ORIGIN", "object"],
["POLYGON_ID", "int64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 7950.3082411,
"random_rows": {
Expand Down Expand Up @@ -215,7 +229,33 @@
"total_bounds": [729480.0, 4390048.3693, 732120.0, 4391255.0]
},
"LULC_VECTOR": {
"columns": ["LULC", "LULC_ID", "LULC_POLYGON_ID", "area", "geometry"],
"agg_stats": {
"LULC_ID": {
"max": 9.0,
"mean": 5.3737051793,
"min": 0.0,
"std": 3.4493061816
},
"LULC_POLYGON_ID": {
"max": 2149.0,
"mean": 982.9561753,
"min": 5.0,
"std": 494.43910402
},
"area": {
"max": 1445567.532,
"mean": 5226.2068038,
"min": 0.077909185493,
"std": 44121.719048
}
},
"columns_and_dtypes": [
["LULC", "object"],
["LULC_ID", "int64"],
["LULC_POLYGON_ID", "int64"],
["area", "float64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 2654.0304164,
"random_rows": {
Expand Down Expand Up @@ -443,7 +483,21 @@
},
"vector_timeless": {
"CROPS_VECTOR": {
"columns": ["CROP_TYPE", "POLYGON_ID", "DATE", "ORIGIN", "geometry"],
"agg_stats": {
"POLYGON_ID": {
"max": 334.0,
"mean": 169.15625,
"min": 9.0,
"std": 99.074787449
}
},
"columns_and_dtypes": [
["CROP_TYPE", "object"],
["DATE", "object"],
["ORIGIN", "object"],
["POLYGON_ID", "int64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 20700.107806,
"random_rows": {
Expand Down Expand Up @@ -589,7 +643,33 @@
"total_bounds": [729480.0, 4391145.0, 732120.0, 4392355.0]
},
"LULC_VECTOR": {
"columns": ["LULC", "LULC_ID", "LULC_POLYGON_ID", "area", "geometry"],
"agg_stats": {
"LULC_ID": {
"max": 9.0,
"mean": 4.200729927,
"min": 0.0,
"std": 3.6711035527
},
"LULC_POLYGON_ID": {
"max": 2145.0,
"mean": 876.00547445,
"min": 5.0,
"std": 650.91998596
},
"area": {
"max": 440499.93262,
"mean": 9875.5129663,
"min": 0.43074247771,
"std": 30295.081901
}
},
"columns_and_dtypes": [
["LULC", "object"],
["LULC_ID", "int64"],
["LULC_POLYGON_ID", "int64"],
["area", "float64"],
["geometry", "geometry"]
],
"crs": "EPSG:32638",
"mean_area": 5832.1912169,
"random_rows": {
Expand Down
Loading
Loading