Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read from multiple s3 regions #1453

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,21 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-disable -->

| Key | Example | Description |
|----------------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
| Key | Example | Description |
|----------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Configure the default region used to initialize an S3FileSystem. This setting will be overwritten if the bucket actually used resolves to a different region. |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |

<!-- markdown-link-check-enable-->

Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/io/pyarrow.py
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def parse_location(location: str) -> Tuple[str, str, str]:

def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
if scheme in {"s3", "s3a", "s3n", "oss"}:
from pyarrow.fs import S3FileSystem
from pyarrow.fs import S3FileSystem, resolve_s3_region

client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
Expand All @@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}

# Override the default s3.region if netloc(bucket) resolves to a different region
try:
client_kwargs["region"] = resolve_s3_region(netloc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about doing this lookup only when the region is not provided explicitly? I think this will do another call to S3.

Copy link
Contributor Author

@jiakai-li jiakai-li Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Fokko, my understanding is that the problem occurs when the provided region doesn't match the data file bucket region, and that will fail the file read for pyarrow. And by overwriting the bucket region (fall back to provided region), we make sure the real bucket region that a data file is stored takes precedence. (this function is cached when using fs_by_scheme, so it will be called only for new bucket that's not resolved previously to save calls to S3)

except (OSError, TypeError):
logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs['region']}")

if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri

Expand Down Expand Up @@ -1326,13 +1332,14 @@ def _task_to_table(
return None


def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args), [(fs, delete) for delete in unique_deletes]
lambda args: _read_deletes(*args),
[(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
Expand Down Expand Up @@ -1366,7 +1373,6 @@ def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
_fs: FileSystem
_projected_schema: Schema
_bound_row_filter: BooleanExpression
_case_sensitive: bool
Expand All @@ -1376,7 +1382,6 @@ class ArrowScan:
Attributes:
_table_metadata: Current table metadata of the Iceberg table
_io: PyIceberg FileIO implementation from which to fetch the io properties
_fs: PyArrow FileSystem to use to read the files
_projected_schema: Iceberg Schema to project onto the data files
_bound_row_filter: Schema bound row expression to filter the data with
_case_sensitive: Case sensitivity when looking up column names
Expand All @@ -1394,7 +1399,6 @@ def __init__(
) -> None:
self._table_metadata = table_metadata
self._io = io
self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
self._projected_schema = projected_schema
self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
self._case_sensitive = case_sensitive
Expand Down Expand Up @@ -1434,7 +1438,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
executor = ExecutorFactory.get_or_create()

def _table_from_scan_task(task: FileScanTask) -> pa.Table:
Expand Down Expand Up @@ -1497,7 +1501,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

def _record_batches_from_scan_tasks_and_deletes(
Expand All @@ -1508,7 +1512,7 @@ def _record_batches_from_scan_tasks_and_deletes(
if self._limit is not None and total_row_count >= self._limit:
break
batches = _task_to_record_batches(
self._fs,
_fs_from_file_path(task.file.file_path, self._io),
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
task,
self._bound_row_filter,
self._projected_schema,
Expand Down
69 changes: 67 additions & 2 deletions tests/io/test_pyarrow.py
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,12 @@ def test_pyarrow_s3_session_properties() -> None:
**UNIFIED_AWS_SESSION_PROPERTIES,
}

with patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
s3_fileio = PyArrowFileIO(properties=session_properties)
filename = str(uuid.uuid4())

# Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region
mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found")
s3_fileio.new_input(location=f"s3://warehouse/{filename}")

mock_s3fs.assert_called_with(
Expand All @@ -381,10 +383,11 @@ def test_pyarrow_unified_session_properties() -> None:
**UNIFIED_AWS_SESSION_PROPERTIES,
}

with patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
s3_fileio = PyArrowFileIO(properties=session_properties)
filename = str(uuid.uuid4())

mock_s3_region_resolver.return_value = "client.region"
s3_fileio.new_input(location=f"s3://warehouse/{filename}")

mock_s3fs.assert_called_with(
Expand Down Expand Up @@ -2074,3 +2077,65 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception(
_to_requested_schema(requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False)

assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value)


def test_pyarrow_file_io_fs_by_scheme_cache() -> None:
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
# Refer to: https://github.com/apache/arrow/issues/43713

pyarrow_file_io = PyArrowFileIO()
us_east_1_region = "us-east-1"
ap_southeast_2_region = "ap-southeast-2"

with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
# Call with new argument resolves region automatically
mock_s3_region_resolver.return_value = us_east_1_region
filesystem_us = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket")
assert filesystem_us.region == us_east_1_region
assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 1 # type: ignore
assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 1 # type: ignore

# Call with different argument also resolves region automatically
mock_s3_region_resolver.return_value = ap_southeast_2_region
filesystem_ap_southeast_2 = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket")
assert filesystem_ap_southeast_2.region == ap_southeast_2_region
assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 2 # type: ignore
assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 2 # type: ignore

# Call with same argument hits cache
filesystem_us_cached = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket")
assert filesystem_us_cached.region == us_east_1_region
assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 1 # type: ignore

# Call with same argument hits cache
filesystem_ap_southeast_2_cached = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket")
assert filesystem_ap_southeast_2_cached.region == ap_southeast_2_region
assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 2 # type: ignore


def test_pyarrow_io_new_input_multi_region() -> None:
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
# Refer to: https://github.com/apache/arrow/issues/43713

bucket_regions = [
("us-east-2-bucket", "us-east-2"),
("ap-southeast-2-bucket", "ap-southeast-2"),
]

def _s3_region_map(bucket: str) -> str:
for bucket_region in bucket_regions:
if bucket_region[0] == bucket:
return bucket_region[1]
raise OSError("Unknown bucket")

# For one single pyarrow io instance with configured default s3 region
pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"})
with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
mock_s3_region_resolver.side_effect = _s3_region_map

# The filesystem region is set by provided property by default (when bucket region cannot be resolved)
assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-1"

# The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one)
for bucket_region in bucket_regions:
assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1]