diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 66b22a7a79..4e1ccfa067 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -84,6 +84,7 @@ SnapshotLogEntry, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.statistics import StatisticsFile from pyiceberg.table.update import ( AddPartitionSpecUpdate, AddSchemaUpdate, @@ -94,12 +95,14 @@ AssertTableUUID, AssignUUIDUpdate, RemovePropertiesUpdate, + RemoveStatisticsUpdate, SetCurrentSchemaUpdate, SetDefaultSortOrderUpdate, SetDefaultSpecUpdate, SetLocationUpdate, SetPropertiesUpdate, SetSnapshotRefUpdate, + SetStatisticsUpdate, TableRequirement, TableUpdate, UpdatesAndRequirements, @@ -663,6 +666,42 @@ def update_location(self, location: str) -> Transaction: """ raise NotImplementedError("Not yet implemented") + def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> Transaction: + """Set the statistics for a snapshot. + + Args: + snapshot_id: The snapshot ID to set the statistics for. + statistics_file: The statistics file to set. + + Returns: + The alter table builder. + """ + updates = ( + SetStatisticsUpdate( + snapshot_id=snapshot_id, + statistics=statistics_file, + ), + ) + + return self._apply(updates, ()) + + def remove_statistics(self, snapshot_id: int) -> Transaction: + """Remove the statistics for a snapshot. + + Args: + snapshot_id: The snapshot ID to remove the statistics for. + + Returns: + The alter table builder. + """ + updates = ( + RemoveStatisticsUpdate( + snapshot_id=snapshot_id, + ), + ) + + return self._apply(updates, ()) + def commit_transaction(self) -> Table: """Commit the changes to the catalog. diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 1fea33010c..f44f1cf5e0 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -44,6 +44,7 @@ SortOrder, assign_fresh_sort_order_ids, ) +from pyiceberg.table.statistics import StatisticsFile from pyiceberg.typedef import ( EMPTY_DICT, IcebergBaseModel, @@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel): There is always a main branch reference pointing to the current-snapshot-id even if the refs map is null.""" + statistics: List[StatisticsFile] = Field(default_factory=list) + """A optional list of table statistics files. + Table statistics files are valid Puffin files. Statistics are + informational. A reader can choose to ignore statistics + information. Statistics support is not required to read the + table correctly. A table can contain many statistics files + associated with different table snapshots.""" + # validators @field_validator("properties", mode="before") def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]: diff --git a/pyiceberg/table/statistics.py b/pyiceberg/table/statistics.py new file mode 100644 index 0000000000..cf1a6449b5 --- /dev/null +++ b/pyiceberg/table/statistics.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import ( + Dict, + List, + Optional, +) + +from pydantic import Field + +from pyiceberg.typedef import IcebergBaseModel + + +class BlobMetadata(IcebergBaseModel): + type: str + snapshot_id: int = Field(alias="snapshot-id") + sequence_number: int = Field(alias="sequence-number") + fields: List[int] + properties: Optional[Dict[str, str]] = None + + +class StatisticsFile(IcebergBaseModel): + snapshot_id: int = Field(alias="snapshot-id") + statistics_path: str = Field(alias="statistics-path") + file_size_in_bytes: int = Field(alias="file-size-in-bytes") + file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes") + blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 6e14046f9a..decbeafa9d 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -37,6 +37,7 @@ SnapshotLogEntry, ) from pyiceberg.table.sorting import SortOrder +from pyiceberg.table.statistics import StatisticsFile from pyiceberg.typedef import ( IcebergBaseModel, Properties, @@ -172,6 +173,17 @@ class RemovePropertiesUpdate(IcebergBaseModel): removals: List[str] +class SetStatisticsUpdate(IcebergBaseModel): + action: Literal["set-statistics"] = Field(default="set-statistics") + snapshot_id: int = Field(alias="snapshot-id") + statistics: StatisticsFile + + +class RemoveStatisticsUpdate(IcebergBaseModel): + action: Literal["remove-statistics"] = Field(default="remove-statistics") + snapshot_id: int = Field(alias="snapshot-id") + + TableUpdate = Annotated[ Union[ AssignUUIDUpdate, @@ -189,6 +201,8 @@ class RemovePropertiesUpdate(IcebergBaseModel): SetLocationUpdate, SetPropertiesUpdate, RemovePropertiesUpdate, + SetStatisticsUpdate, + RemoveStatisticsUpdate, ], Field(discriminator="action"), ] @@ -477,6 +491,29 @@ def _( return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id}) +@_apply_table_update.register(SetStatisticsUpdate) +def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if update.snapshot_id != update.statistics.snapshot_id: + raise ValueError("Snapshot id in statistics does not match the snapshot id in the update") + + rest_statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id] + + context.add_update(update) + return base_metadata.model_copy(update={"statistics": rest_statistics + [update.statistics]}) + + +@_apply_table_update.register(RemoveStatisticsUpdate) +def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics): + raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist") + + statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id] + + context.add_update(update) + + return base_metadata.model_copy(update={"statistics": statistics}) + + def update_table_metadata( base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...], diff --git a/tests/conftest.py b/tests/conftest.py index b05947ebe6..3e74389009 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -918,6 +918,87 @@ def generate_snapshot( "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}}, } +TABLE_METADATA_V2_WITH_STATISTICS = { + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": True, + "type": "long", + } + ], + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/1.avro", + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1, + }, + ], + "statistics": [ + { + "snapshot-id": 3051729675574597004, + "statistics-path": "s3://a/b/stats.puffin", + "file-size-in-bytes": 413, + "file-footer-size-in-bytes": 42, + "blob-metadata": [ + { + "type": "ndv", + "snapshot-id": 3051729675574597004, + "sequence-number": 1, + "fields": [1], + } + ], + }, + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://a/b/stats.puffin", + "file-size-in-bytes": 413, + "file-footer-size-in-bytes": 42, + "blob-metadata": [ + { + "type": "ndv", + "snapshot-id": 3055729675574597004, + "sequence-number": 1, + "fields": [1], + } + ], + }, + ], + "snapshot-log": [], + "metadata-log": [], +} + @pytest.fixture def example_table_metadata_v2() -> Dict[str, Any]: @@ -929,6 +1010,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]: return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES +@pytest.fixture +def table_metadata_v2_with_statistics() -> Dict[str, Any]: + return TABLE_METADATA_V2_WITH_STATISTICS + + @pytest.fixture(scope="session") def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str: from pyiceberg.io.pyarrow import PyArrowFileIO @@ -2170,6 +2256,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s ) +@pytest.fixture +def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table: + table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics) + return Table( + identifier=("database", "table"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + @pytest.fixture def bound_reference_str() -> BoundReference[str]: return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None)) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 1c4029a292..b783376804 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +import json import uuid from copy import copy from typing import Any, Dict @@ -64,6 +65,7 @@ SortField, SortOrder, ) +from pyiceberg.table.statistics import BlobMetadata, StatisticsFile from pyiceberg.table.update import ( AddSnapshotUpdate, AddSortOrderUpdate, @@ -76,9 +78,11 @@ AssertRefSnapshotId, AssertTableUUID, RemovePropertiesUpdate, + RemoveStatisticsUpdate, SetDefaultSortOrderUpdate, SetPropertiesUpdate, SetSnapshotRefUpdate, + SetStatisticsUpdate, _apply_table_update, _TableMetadataUpdateContext, update_table_metadata, @@ -1258,3 +1262,97 @@ def test_table_module_refactoring_backward_compatibility() -> None: ) except Exception as exc: raise pytest.fail("Importing moved modules should not raise an exception") from exc + + +def test_set_statistics_update(table_v2_with_statistics: Table) -> None: + snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id + + blob_metadata = BlobMetadata( + type="boring-type", + snapshot_id=snapshot_id, + sequence_number=2, + fields=[1], + properties={"prop-key": "prop-value"}, + ) + + statistics_file = StatisticsFile( + snapshot_id=snapshot_id, + statistics_path="s3://bucket/warehouse/stats.puffin", + file_size_in_bytes=124, + file_footer_size_in_bytes=27, + blob_metadata=[blob_metadata], + ) + + update = SetStatisticsUpdate( + snapshot_id=snapshot_id, + statistics=statistics_file, + ) + + new_metadata = update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + expected = """ + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://bucket/warehouse/stats.puffin", + "file-size-in-bytes": 124, + "file-footer-size-in-bytes": 27, + "blob-metadata": [ + { + "type": "boring-type", + "snapshot-id": 3055729675574597004, + "sequence-number": 2, + "fields": [ + 1 + ], + "properties": { + "prop-key": "prop-value" + } + } + ] + }""" + + assert len(new_metadata.statistics) == 2 + + updated_statistics = [stat for stat in new_metadata.statistics if stat.snapshot_id == snapshot_id] + + assert len(updated_statistics) == 1 + assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected) + + update = SetStatisticsUpdate( + snapshot_id=123456789, + statistics=statistics_file, + ) + + with pytest.raises( + ValueError, + match="Snapshot id in statistics does not match the snapshot id in the update", + ): + update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + +def test_remove_statistics_update(table_v2_with_statistics: Table) -> None: + update = RemoveStatisticsUpdate( + snapshot_id=3055729675574597004, + ) + + remove_metadata = update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + assert len(remove_metadata.statistics) == 1 + + with pytest.raises( + ValueError, + match="Statistics with snapshot id 123456789 does not exist", + ): + update_table_metadata( + table_v2_with_statistics.metadata, + (RemoveStatisticsUpdate(snapshot_id=123456789),), + )