Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support regular parquet in test statistics #330

Merged
merged 21 commits into from
Mar 13, 2024
Merged
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions eogrow/utils/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import json
import os
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, replace
from functools import partial
from typing import Any, Iterable, cast
from typing import Any, Iterable, Optional, cast

import fs
import geopandas as gpd
Expand Down Expand Up @@ -38,6 +38,7 @@ class StatCalcConfig:
unique_values_limit: int = 8
histogram_bin_num: int = 8
num_random_values: int = 8
parquet_epsg: Optional[int] = None # if set, tries to load parquet as a geoparquet


def compare_with_saved(stats: JsonDict, filename: str) -> DeepDiff:
Expand Down Expand Up @@ -75,7 +76,7 @@ def calculate_statistics(folder: str, config: StatCalcConfig) -> JsonDict:
for content in os.listdir(folder):
content_path = fs.path.combine(folder, content)

if os.path.isdir(content_path):
if os.path.isdir(content_path) and not content_path.endswith("parquet"):
fs_data_info = get_filesystem_data_info(OSFS("/"), content_path)
if fs_data_info.bbox is not None:
load_timestamps = fs_data_info.timestamps is not None
Expand All @@ -89,26 +90,34 @@ def calculate_statistics(folder: str, config: StatCalcConfig) -> JsonDict:
elif content_path.endswith(".npy"):
stats[content] = _calculate_numpy_stats(np.load(content_path, allow_pickle=True), config)
elif content_path.endswith((".geojson", ".gpkg")):
stats[content] = _calculate_vector_stats(gpd.read_file(content_path), config)
stats[content] = _calculate_vector_stats(gpd.read_file(content_path, engine="pyogrio"), config)
elif content_path.endswith(".parquet"):
try:
data = gpd.read_parquet(content_path)
except Exception:
data = _load_as_geoparquet(content_path)
stats[content] = _calculate_vector_stats(data, config)
# don't sample random rows due to non-determinism in parquet row order
stats[content] = _get_parquet_stats(content_path, replace(config, num_random_values=0))
zigaLuksic marked this conversation as resolved.
Show resolved Hide resolved
else:
stats[content] = None

return stats


def _load_as_geoparquet(path: str) -> gpd.GeoDataFrame:
data = pd.read_parquet(path)
if isinstance(data.geometry.iloc[0], str):
data.geometry = data.geometry.apply(wkt.loads)
elif isinstance(data.geometry.iloc[0], bytes):
data.geometry = data.geometry.apply(wkb.loads)
return gpd.GeoDataFrame(data, geometry="geometry", crs=data.utm_crs.iloc[0])
def _get_parquet_stats(content_path: str, config: StatCalcConfig) -> JsonDict:
try:
data = gpd.read_parquet(content_path, engine="pyogrio")
zigaLuksic marked this conversation as resolved.
Show resolved Hide resolved
return _calculate_vector_stats(data, config)
except Exception:
data = pd.read_parquet(content_path)
if "geometry" not in data:
return _calculate_parquet_stats(data, config)

loader = wkb.loads if isinstance(data.geometry.iloc[0], bytes) else wkt.loads
data.geometry = data.geometry.apply(loader)

if config.parquet_epsg:
parsed_data = gpd.GeoDataFrame(data, geometry="geometry", crs=config.parquet_epsg)
return _calculate_vector_stats(parsed_data, config)

data.geometry = data.geometry.apply(wkt.dumps)
return _calculate_parquet_stats(data, config)


def _calculate_eopatch_stats(eopatch: EOPatch, config: StatCalcConfig) -> JsonDict:
Expand Down Expand Up @@ -194,7 +203,7 @@ def _get_coords_sample(geom: Polygon | MultiPolygon | Any) -> list[tuple[float,
return [_rounder(point) for point in geom.exterior.coords[:10]] if isinstance(geom, Polygon) else None

stats = {
"columns": list(gdf),
"columns_and_dtypes": list(gdf.dtypes.astype(str).sort_index().items()),
"row_count": len(gdf),
"crs": str(gdf.crs),
"mean_area": _prepare_value(gdf.area.mean(), np.float64),
Expand All @@ -217,6 +226,22 @@ def _get_coords_sample(geom: Polygon | MultiPolygon | Any) -> list[tuple[float,
return stats


def _calculate_parquet_stats(data: pd.DataFrame, config: StatCalcConfig) -> JsonDict:
stats = {
"columns_and_dtypes": list(data.dtypes.astype(str).sort_index().items()),
"row_count": len(data),
}

if len(data):
subsample: pd.DataFrame = data.sample(min(len(data), config.num_random_values), random_state=42)
for col in subsample.select_dtypes(include="number").columns.values:
subsample[col] = subsample[col].apply(partial(_prepare_value, dtype=subsample[col].dtype))

subsample_json_string = subsample.to_json(orient="index", date_format="iso")
stats["random_rows"] = json.loads(subsample_json_string)
return stats


def _calculate_basic_stats(values: np.ndarray) -> dict[str, float]:
"""Randomly samples a small amount of points from the array (10% by default) to recalculate the statistics.
This introduces a 'positional instability' so that accidental mirroring or re-orderings are detected."""
Expand Down Expand Up @@ -314,6 +339,7 @@ def compare_content(
folder_path: str | None,
stats_path: str,
*,
config: StatCalcConfig | None = None,
save_new_stats: bool = False,
) -> None:
"""Compares the results from a pipeline run with the saved statistics. Constructed to be coupled with `run_config`
Expand All @@ -325,8 +351,8 @@ def compare_content(
"""
if folder_path is None:
raise ValueError("The given path is None. The pipeline likely has no `output_folder_key` parameter.")

stats = calculate_statistics(folder_path, config=StatCalcConfig())
config = StatCalcConfig() if config is None else config
stats = calculate_statistics(folder_path, config=config)

if save_new_stats:
save_statistics(stats, stats_path)
Expand Down
Loading