-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Location Providers #1452
base: main
Are you sure you want to change the base?
Changes from all commits
adfbd3c
ea2b456
ce5f0d5
d3e0c0f
00917e9
c4e6be9
bc2eab8
9999cbb
23ef8f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,9 @@ | |
# under the License. | ||
from __future__ import annotations | ||
|
||
import importlib | ||
import itertools | ||
import logging | ||
import uuid | ||
import warnings | ||
from abc import ABC, abstractmethod | ||
|
@@ -145,6 +147,8 @@ | |
|
||
from pyiceberg.catalog import Catalog | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
ALWAYS_TRUE = AlwaysTrue() | ||
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" | ||
|
||
|
@@ -187,6 +191,14 @@ class TableProperties: | |
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" | ||
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 | ||
|
||
WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though the docs say that the default is |
||
|
||
OBJECT_STORE_ENABLED = "write.object-storage.enabled" | ||
OBJECT_STORE_ENABLED_DEFAULT = False | ||
|
||
WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths" | ||
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True | ||
|
||
DELETE_MODE = "write.delete.mode" | ||
DELETE_MODE_COPY_ON_WRITE = "copy-on-write" | ||
DELETE_MODE_MERGE_ON_READ = "merge-on-read" | ||
|
@@ -1611,13 +1623,6 @@ def generate_data_file_filename(self, extension: str) -> str: | |
# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 | ||
return f"00000-{self.task_id}-{self.write_uuid}.{extension}" | ||
|
||
def generate_data_file_path(self, extension: str) -> str: | ||
if self.partition_key: | ||
file_path = f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}" | ||
return file_path | ||
else: | ||
return self.generate_data_file_filename(extension) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class AddFileTask: | ||
|
@@ -1627,6 +1632,67 @@ class AddFileTask: | |
partition_field_value: Record | ||
|
||
|
||
class LocationProvider(ABC): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would also expect this one to be in |
||
"""A base class for location providers, that provide data file locations for write tasks.""" | ||
|
||
table_location: str | ||
table_properties: Properties | ||
|
||
def __init__(self, table_location: str, table_properties: Properties): | ||
self.table_location = table_location | ||
self.table_properties = table_properties | ||
|
||
@abstractmethod | ||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
"""Return a fully-qualified data file location for the given filename. | ||
|
||
Args: | ||
data_file_name (str): The name of the data file. | ||
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned. | ||
|
||
Returns: | ||
str: A fully-qualified location URI for the data file. | ||
""" | ||
|
||
|
||
def _import_location_provider( | ||
location_provider_impl: str, table_location: str, table_properties: Properties | ||
) -> Optional[LocationProvider]: | ||
try: | ||
path_parts = location_provider_impl.split(".") | ||
if len(path_parts) < 2: | ||
raise ValueError( | ||
f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" | ||
) | ||
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] | ||
module = importlib.import_module(module_name) | ||
class_ = getattr(module, class_name) | ||
Comment on lines
+1667
to
+1669
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, wonder if we should reduce duplication between this and file IO loading. |
||
return class_(table_location, table_properties) | ||
except ModuleNotFoundError: | ||
logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) | ||
return None | ||
|
||
|
||
def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: | ||
table_location = table_location.rstrip("/") | ||
|
||
if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): | ||
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): | ||
logger.info("Loaded LocationProvider: %s", location_provider_impl) | ||
return location_provider | ||
else: | ||
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") | ||
|
||
if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): | ||
from pyiceberg.table.locations import ObjectStoreLocationProvider | ||
|
||
return ObjectStoreLocationProvider(table_location, table_properties) | ||
else: | ||
from pyiceberg.table.locations import DefaultLocationProvider | ||
|
||
return DefaultLocationProvider(table_location, table_properties) | ||
|
||
|
||
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: | ||
"""Convert a list files into DataFiles. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from typing import Optional | ||
|
||
import mmh3 | ||
|
||
from pyiceberg.partitioning import PartitionKey | ||
from pyiceberg.table import LocationProvider, TableProperties | ||
from pyiceberg.typedef import Properties | ||
from pyiceberg.utils.properties import property_as_bool | ||
|
||
|
||
class DefaultLocationProvider(LocationProvider): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The biggest difference vs the Java implementations is that I've not supported |
||
def __init__(self, table_location: str, table_properties: Properties): | ||
super().__init__(table_location, table_properties) | ||
|
||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
prefix = f"{self.table_location}/data" | ||
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" | ||
|
||
|
||
HASH_BINARY_STRING_BITS = 20 | ||
ENTROPY_DIR_LENGTH = 4 | ||
ENTROPY_DIR_DEPTH = 3 | ||
|
||
|
||
class ObjectStoreLocationProvider(LocationProvider): | ||
_include_partition_paths: bool | ||
|
||
def __init__(self, table_location: str, table_properties: Properties): | ||
super().__init__(table_location, table_properties) | ||
self._include_partition_paths = property_as_bool( | ||
self.table_properties, | ||
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, | ||
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT, | ||
) | ||
|
||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried to make this as consistent with its Java counter-part so file locations are consistent too. This means hashing on both the partition key and the data file name below, and using the same hash function. Seemed reasonable to port over the the object storage stuff in this PR, given that the original issue #861 mentions this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since Iceberg is mainly focussed on object-stores, I'm leaning towards making the |
||
if self._include_partition_paths and partition_key: | ||
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}") | ||
|
||
prefix = f"{self.table_location}/data" | ||
hashed_path = self._compute_hash(data_file_name) | ||
|
||
return ( | ||
f"{prefix}/{hashed_path}/{data_file_name}" | ||
if self._include_partition_paths | ||
else f"{prefix}/{hashed_path}-{data_file_name}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting that disabling |
||
) | ||
|
||
@staticmethod | ||
def _compute_hash(data_file_name: str) -> str: | ||
# Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip. | ||
hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS) | ||
return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:]) | ||
|
||
@staticmethod | ||
def _dirs_from_hash(file_hash: str) -> str: | ||
"""Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH.""" | ||
hash_with_dirs = [] | ||
for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH): | ||
hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH]) | ||
|
||
if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH: | ||
hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :]) | ||
|
||
return "/".join(hash_with_dirs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't love this. I wanted to do something like this and cache on at least the
Transaction
(which this method is exclusively invoked by) but the problem I think is that properties can change on theTransaction
, potentially changing the location provider to be used. I suppose we can update that provider on a property change (or maybe any metadata change) but unsure if this complexity is even worth it.