Skip to content

Commit

Permalink
Improve catalog validation and column statistics (#404)
Browse files Browse the repository at this point in the history
* Change validation for improved dataset reads. Add statistics method on catlog.

* Skip pixel validation for index catalogs.

* Pylint from main merge.

* Comments from code review.

* pyyyylinttttt
  • Loading branch information
delucchi-cmu authored Oct 31, 2024
1 parent 7783fbf commit 8e1d456
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 82 deletions.
25 changes: 24 additions & 1 deletion src/hats/catalog/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from pathlib import Path
from typing import List

import pyarrow as pa
from upath import UPath

from hats.catalog.dataset.table_properties import TableProperties
from hats.io import file_io
from hats.io.parquet_metadata import aggregate_column_statistics


# pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -34,5 +36,26 @@ def __init__(
self.catalog_path = catalog_path
self.on_disk = catalog_path is not None
self.catalog_base_dir = file_io.get_upath(self.catalog_path)

self.schema = schema

def aggregate_column_statistics(
self,
exclude_hats_columns: bool = True,
exclude_columns: List[str] = None,
include_columns: List[str] = None,
):
"""Read footer statistics in parquet metadata, and report on global min/max values.
Args:
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
"""
return aggregate_column_statistics(
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
)
2 changes: 1 addition & 1 deletion src/hats/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,4 @@ def calculate_fractional_coverage(self):
area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order]
# 41253 is the number of square degrees in a sphere
# https://en.wikipedia.org/wiki/Square_degree
return (area_by_order * cov_count).sum() / 41253
return (area_by_order * cov_count).sum() / (360**2 / np.pi)
116 changes: 52 additions & 64 deletions src/hats/io/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from pathlib import Path

import numpy as np
import pyarrow.dataset as pds
from upath import UPath

import hats.pixel_math.healpix_shim as hp
from hats.catalog.dataset.table_properties import TableProperties
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset
from hats.catalog.partition_info import PartitionInfo
from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer
from hats.io.file_io import read_parquet_dataset
from hats.io.file_io import get_upath
from hats.io.file_io.file_pointer import is_regular_file
from hats.io.paths import get_healpix_from_path
from hats.loaders import read_hats
Expand Down Expand Up @@ -40,6 +41,7 @@ def is_valid_catalog(
True if both the properties and partition_info files are
valid, False otherwise
"""
pointer = get_upath(pointer)
if not strict:
return is_catalog_info_valid(pointer) and (
is_partition_info_valid(pointer) or is_metadata_valid(pointer)
Expand Down Expand Up @@ -67,9 +69,6 @@ def handle_error(msg):
if not is_catalog_info_valid(pointer):
handle_error("properties file does not exist or is invalid.")

if not is_partition_info_valid(pointer):
handle_error("partition_info.csv file does not exist.")

if not is_metadata_valid(pointer):
handle_error("_metadata file does not exist.")

Expand All @@ -83,74 +82,63 @@ def handle_error(msg):

# Load as a catalog object. Confirms that the catalog info matches type.
catalog = read_hats(pointer)
expected_pixels = sort_pixels(catalog.get_healpix_pixels())

if verbose:
print(f"Found {len(expected_pixels)} partitions.")

## Compare the pixels in _metadata with partition_info.csv
metadata_file = get_parquet_metadata_pointer(pointer)

# Use both strategies of reading the partition info: strict and !strict.
metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels()
## Load as parquet dataset. Allow errors, and check pixel set against _metadata
# As a side effect, this confirms that we can load the directory as a valid dataset.
dataset = pds.parquet_dataset(
metadata_file.path,
filesystem=metadata_file.fs,
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (strict)")

metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (non-strict)")
if isinstance(catalog, HealpixDataset):
if not is_partition_info_valid(pointer):
handle_error("partition_info.csv file does not exist.")
return is_valid

partition_info_file = get_partition_info_pointer(pointer)
csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels())
if not np.array_equal(expected_pixels, csv_pixels):
handle_error("Partition pixels differ between catalog and partition_info.csv file")
expected_pixels = sort_pixels(catalog.get_healpix_pixels())

## Load as parquet dataset. Allow errors, and check pixel set against _metadata
ignore_prefixes = [
"_common_metadata",
"_metadata",
"catalog_info.json",
"properties",
"provenance_info.json",
"partition_info.csv",
"point_map.fits",
"README",
]
if verbose:
print(f"Found {len(expected_pixels)} partitions.")

# As a side effect, this confirms that we can load the directory as a valid dataset.
(dataset_path, dataset) = read_parquet_dataset(
pointer,
ignore_prefixes=ignore_prefixes,
exclude_invalid_files=False,
)
## Compare the pixels in _metadata with partition_info.csv
# Use both strategies of reading the partition info: strict and !strict.
metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (strict)")

metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (non-strict)")

parquet_path_pixels = []
for hats_file in dataset.files:
relative_path = hats_file[len(dataset_path) :]
healpix_pixel = get_healpix_from_path(relative_path)
if healpix_pixel == INVALID_PIXEL:
handle_error(f"Could not derive partition pixel from parquet path: {relative_path}")
partition_info_file = get_partition_info_pointer(pointer)
partition_info = PartitionInfo.read_from_csv(partition_info_file)
csv_pixels = sort_pixels(partition_info.get_healpix_pixels())
if not np.array_equal(expected_pixels, csv_pixels):
handle_error("Partition pixels differ between catalog and partition_info.csv file")

parquet_path_pixels.append(healpix_pixel)
parquet_path_pixels = []
for hats_file in dataset.files:
healpix_pixel = get_healpix_from_path(hats_file)
if healpix_pixel == INVALID_PIXEL:
handle_error(f"Could not derive partition pixel from parquet path: {hats_file}")
parquet_path_pixels.append(healpix_pixel)

parquet_path_pixels = sort_pixels(parquet_path_pixels)
parquet_path_pixels = sort_pixels(parquet_path_pixels)

if not np.array_equal(expected_pixels, parquet_path_pixels):
handle_error("Partition pixels differ between catalog and parquet paths")
if not np.array_equal(expected_pixels, parquet_path_pixels):
handle_error("Partition pixels differ between catalog and parquet paths")

if verbose:
# Print a few more stats
pixel_orders = [p.order for p in expected_pixels]
cov_order, cov_count = np.unique(pixel_orders, return_counts=True)
area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order]
total_area = (area_by_order * cov_count).sum()
print(
f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky."
)
if verbose:
# Print a few more stats
print(
"Approximate coverage is "
f"{partition_info.calculate_fractional_coverage()*100:0.2f} % of the sky."
)

return is_valid

Expand All @@ -172,7 +160,7 @@ def is_catalog_info_valid(pointer: str | Path | UPath) -> bool:
return True


def is_partition_info_valid(pointer: str | Path | UPath) -> bool:
def is_partition_info_valid(pointer: UPath) -> bool:
"""Checks if partition_info is valid for a given base catalog pointer
Args:
Expand All @@ -186,7 +174,7 @@ def is_partition_info_valid(pointer: str | Path | UPath) -> bool:
return partition_info_exists


def is_metadata_valid(pointer: str | Path | UPath) -> bool:
def is_metadata_valid(pointer: UPath) -> bool:
"""Checks if _metadata is valid for a given base catalog pointer
Args:
Expand All @@ -200,7 +188,7 @@ def is_metadata_valid(pointer: str | Path | UPath) -> bool:
return metadata_file_exists


def is_common_metadata_valid(pointer: str | Path | UPath) -> bool:
def is_common_metadata_valid(pointer: UPath) -> bool:
"""Checks if _common_metadata is valid for a given base catalog pointer
Args:
Expand Down
13 changes: 13 additions & 0 deletions tests/hats/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ def test_load_catalog_small_sky_order1(small_sky_order1_dir):
assert len(cat.get_healpix_pixels()) == 4


def test_aggregate_column_statistics(small_sky_order1_dir):
cat = read_hats(small_sky_order1_dir)

result_frame = cat.aggregate_column_statistics()
assert len(result_frame) == 5

result_frame = cat.aggregate_column_statistics(exclude_hats_columns=False)
assert len(result_frame) == 9

result_frame = cat.aggregate_column_statistics(include_columns=["ra", "dec"])
assert len(result_frame) == 2


def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir):
"""Instantiate a catalog with 4 pixels"""
cat = read_hats(small_sky_order1_dir)
Expand Down
33 changes: 17 additions & 16 deletions tests/hats/io/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import os
import shutil
from pathlib import Path

import pytest

Expand Down Expand Up @@ -65,10 +64,7 @@ def test_is_valid_catalog_strict(tmp_path, small_sky_catalog, small_sky_pixels,
assert not is_valid_catalog(tmp_path, **flags)

# This outta do it! Add parquet files that match the _metadata pixels.
shutil.copytree(
Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0",
tmp_path / "dataset" / "Norder=0",
)
shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True)

assert is_valid_catalog(tmp_path, **flags)

Expand All @@ -93,22 +89,20 @@ def test_is_valid_catalog_fail_fast(tmp_path, small_sky_catalog, small_sky_pixel

# Having the catalog_info file is not enough
small_sky_catalog.catalog_info.to_properties_file(tmp_path)
with pytest.raises(ValueError, match="_metadata"):
is_valid_catalog(tmp_path, **flags)

total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path)
with pytest.raises(ValueError, match="partition_info.csv"):
is_valid_catalog(tmp_path, **flags)

PartitionInfo.from_healpix(small_sky_pixels).write_to_file(catalog_path=tmp_path)
with pytest.raises(ValueError, match="_metadata"):
is_valid_catalog(tmp_path, **flags)

total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path)
assert total_rows == 1
with pytest.raises(ValueError, match="parquet paths"):
with pytest.raises(ValueError, match="parquet path"):
is_valid_catalog(tmp_path, **flags)

shutil.copytree(
Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0",
tmp_path / "dataset" / "Norder=0",
)
shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True)
assert is_valid_catalog(tmp_path, **flags)


Expand All @@ -127,7 +121,6 @@ def test_is_valid_catalog_verbose_fail(tmp_path, capsys):
captured = capsys.readouterr().out
assert "Validating catalog at path" in captured
assert "properties file does not exist or is invalid" in captured
assert "partition_info.csv file does not exist" in captured
assert "_metadata file does not exist" in captured
assert "_common_metadata file does not exist" in captured

Expand All @@ -145,10 +138,16 @@ def test_is_valid_catalog_verbose_success(small_sky_dir, capsys):
captured = capsys.readouterr().out
assert "Validating catalog at path" in captured
assert "Found 1 partition" in captured
assert "Approximate coverage is 3437.75 sq deg" in captured
assert "Approximate coverage is 8" in captured


def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, small_sky_dir):
def test_valid_catalog_strict_all(
small_sky_source_dir,
small_sky_order1_dir,
small_sky_dir,
small_sky_source_object_index_dir,
margin_catalog_path,
):
"""Check that all of our object catalogs in test data are valid, using strict mechanism"""
flags = {
"strict": True, # more intensive checks
Expand All @@ -158,3 +157,5 @@ def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, sm
assert is_valid_catalog(small_sky_source_dir, **flags)
assert is_valid_catalog(small_sky_order1_dir, **flags)
assert is_valid_catalog(small_sky_dir, **flags)
assert is_valid_catalog(small_sky_source_object_index_dir, **flags)
assert is_valid_catalog(margin_catalog_path, **flags)

0 comments on commit 8e1d456

Please sign in to comment.