Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Location Providers #1452

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@
visit,
visit_with_partner,
)
from pyiceberg.table import (
LocationProvider,
load_location_provider,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -2415,7 +2419,9 @@ def data_file_statistics_from_parquet_metadata(
)


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
def write_file(
io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask]
) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
Expand Down Expand Up @@ -2446,7 +2452,10 @@ def write_parquet(task: WriteTask) -> DataFile:
for batch in task.record_batches
]
arrow_table = pa.Table.from_batches(batches)
file_path = f"{table_metadata.location}/data/{task.generate_data_file_path('parquet')}"
file_path = location_provider.new_data_location(
data_file_name=task.generate_data_file_filename("parquet"),
partition_key=task.partition_key,
)
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer:
Expand Down Expand Up @@ -2622,13 +2631,15 @@ def _dataframe_to_data_files(
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love this. I wanted to do something like this and cache on at least the Transaction (which this method is exclusively invoked by) but the problem I think is that properties can change on the Transaction, potentially changing the location provider to be used. I suppose we can update that provider on a property change (or maybe any metadata change) but unsure if this complexity is even worth it.

name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)

if table_metadata.spec().is_unpartitioned():
yield from write_file(
io=io,
location_provider=location_provider,
table_metadata=table_metadata,
tasks=iter([
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
Expand All @@ -2639,6 +2650,7 @@ def _dataframe_to_data_files(
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
yield from write_file(
io=io,
location_provider=location_provider,
table_metadata=table_metadata,
tasks=iter([
WriteTask(
Expand Down
81 changes: 73 additions & 8 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# under the License.
from __future__ import annotations

import importlib
import itertools
import logging
import uuid
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -138,7 +140,6 @@
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.deprecated import deprecation_message as deprecation_message
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
Expand All @@ -150,6 +151,8 @@

from pyiceberg.catalog import Catalog

logger = logging.getLogger(__name__)

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"

Expand Down Expand Up @@ -192,6 +195,14 @@ class TableProperties:
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0

WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the docs say that the default is null, having a constant for this being None felt unnecessary


OBJECT_STORE_ENABLED = "write.object-storage.enabled"
OBJECT_STORE_ENABLED_DEFAULT = False

WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths"
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
Expand Down Expand Up @@ -1616,13 +1627,6 @@ def generate_data_file_filename(self, extension: str) -> str:
# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"

def generate_data_file_path(self, extension: str) -> str:
if self.partition_key:
file_path = f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}"
return file_path
else:
return self.generate_data_file_filename(extension)


@dataclass(frozen=True)
class AddFileTask:
Expand All @@ -1632,6 +1636,67 @@ class AddFileTask:
partition_field_value: Record


class LocationProvider(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also expect this one to be in location.py? The table/__init__.py is already pretty big

"""A base class for location providers, that provide data file locations for write tasks."""

table_location: str
table_properties: Properties

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.

Args:
data_file_name (str): The name of the data file.
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.

Returns:
str: A fully-qualified location URI for the data file.
"""


def _import_location_provider(
location_provider_impl: str, table_location: str, table_properties: Properties
) -> Optional[LocationProvider]:
try:
path_parts = location_provider_impl.split(".")
if len(path_parts) < 2:
raise ValueError(
f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}"
)
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
Comment on lines +1667 to +1669
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wonder if we should reduce duplication between this and file IO loading.

return class_(table_location, table_properties)
except ModuleNotFoundError:
logger.warning("Could not initialize LocationProvider: %s", location_provider_impl)
return None


def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider:
table_location = table_location.rstrip("/")

if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL):
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties):
logger.info("Loaded LocationProvider: %s", location_provider_impl)
return location_provider
else:
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}")

if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
from pyiceberg.table.locations import ObjectStoreLocationProvider

return ObjectStoreLocationProvider(table_location, table_properties)
else:
from pyiceberg.table.locations import DefaultLocationProvider

return DefaultLocationProvider(table_location, table_properties)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Expand Down
82 changes: 82 additions & 0 deletions pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Optional

import mmh3

from pyiceberg.partitioning import PartitionKey
from pyiceberg.table import LocationProvider, TableProperties
from pyiceberg.typedef import Properties
from pyiceberg.utils.properties import property_as_bool


class DefaultLocationProvider(LocationProvider):
Copy link
Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest difference vs the Java implementations is that I've not supported write.data.path here. I think it's natural for write.metadata.path to be supported alongside this so this would be a larger and arguably location-provider-independent change? Can look into it as a follow-up.

def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)

def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
prefix = f"{self.table_location}/data"
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"


HASH_BINARY_STRING_BITS = 20
ENTROPY_DIR_LENGTH = 4
ENTROPY_DIR_DEPTH = 3


class ObjectStoreLocationProvider(LocationProvider):
_include_partition_paths: bool

def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)
self._include_partition_paths = property_as_bool(
table_properties,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
)

def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
Copy link
Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to make this as consistent with its Java counter-part so file locations are consistent too. This means hashing on both the partition key and the data file name below, and using the same hash function.

Seemed reasonable to port over the the object storage stuff in this PR, given that the original issue #861 mentions this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Iceberg is mainly focussed on object-stores, I'm leaning towards making the ObjectStorageLocationProvider the default. Java is a great source of inspiration, but it also holds a lot of historical decisions that are not easy to change, so we should reconsider this at PyIceberg.

if self._include_partition_paths and partition_key:
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")

prefix = f"{self.table_location}/data"
hashed_path = self._compute_hash(data_file_name)

return (
f"{prefix}/{hashed_path}/{data_file_name}"
if self._include_partition_paths
else f"{prefix}/{hashed_path}-{data_file_name}"
Copy link
Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that disabling include_partition_paths affects paths of non-partitioned data files. I've matched Java behaviour here but it does feel odd.

)

@staticmethod
def _compute_hash(data_file_name: str) -> str:
# Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip.
hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS)
return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:])

@staticmethod
def _dirs_from_hash(file_hash: str) -> str:
"""Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
hash_with_dirs = []
for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH):
hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH])

if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH:
hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :])

return "/".join(hash_with_dirs)
130 changes: 130 additions & 0 deletions tests/table/test_locations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

import pytest

from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
LocationProvider,
load_location_provider,
)
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import NestedField, StringType

TABLE_SCHEMA = Schema(NestedField(field_id=2, name="field", field_type=StringType(), required=False))
PARTITION_FIELD = PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="part#field")
PARTITION_SPEC = PartitionSpec(PARTITION_FIELD)
PARTITION_KEY = PartitionKey(
raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example#val")],
partition_spec=PARTITION_SPEC,
schema=TABLE_SCHEMA,
)


class CustomLocationProvider(LocationProvider):
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
return f"custom_location_provider/{data_file_name}"


def test_default_location_provider() -> None:
Copy link
Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in this file are inspired by https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/TestLocationProvider.java.

The hash functions are the same so those constants are unchanged.

provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)

assert provider.new_data_location("my_file") == "table_location/data/my_file"


def test_custom_location_provider() -> None:
qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__
provider = load_location_provider(
table_location="table_location", table_properties={"write.location-provider.impl": qualified_name}
)

assert provider.new_data_location("my_file") == "custom_location_provider/my_file"


def test_custom_location_provider_single_path() -> None:
with pytest.raises(ValueError, match=r"write\.location-provider\.impl should be full path"):
load_location_provider(table_location="table_location", table_properties={"write.location-provider.impl": "not_found"})


def test_custom_location_provider_not_found() -> None:
with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"):
load_location_provider(
table_location="table_location", table_properties={"write.location-provider.impl": "module.not_found"}
)


def test_object_storage_injects_entropy() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})

location = provider.new_data_location("test.parquet")
parts = location.split("/")

assert len(parts) == 7
assert parts[0] == "table_location"
assert parts[1] == "data"
# Entropy directories in the middle
assert parts[-1] == "test.parquet"


@pytest.mark.parametrize("object_storage", [True, False])
def test_partition_value_in_path(object_storage: bool) -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
"write.object-storage.enabled": str(object_storage),
},
)

location = provider.new_data_location("test.parquet", PARTITION_KEY)
partition_segment = location.split("/")[-2]

# Field name is not encoded but partition value is - this differs from the Java implementation
# https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/test/java/org/apache/iceberg/TestLocationProvider.java#L304
assert partition_segment == "part#field=example%23val"
Copy link
Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put up #1457 - I'll remove this special-character testing (that the Java test counterpart does) here because it'll be tested in that PR.



def test_object_storage_exclude_partition_in_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
"write.object-storage.enabled": "true",
"write.object-storage.partitioned-paths": "false",
},
)

location = provider.new_data_location("test.parquet", PARTITION_KEY)

# No partition values included in the path and last part of entropy is seperated with "-"
assert location == "table_location/data/0110/1010/0011/11101000-test.parquet"


@pytest.mark.parametrize(
["data_file_name", "expected_hash"],
[
("a", "0101/0110/1001/10110010"),
("b", "1110/0111/1110/00000011"),
("c", "0010/1101/0110/01011111"),
("d", "1001/0001/0100/01110011"),
],
)
def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})

assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"