From 7f52a1ac57a4ea97a2f82cf09196e7f082a37b2a Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Fri, 20 Dec 2024 12:39:04 +1300 Subject: [PATCH 01/20] Take netloc into account for s3 filesystem when calling `_initialize_fs` --- pyiceberg/io/pyarrow.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 9847ec5a1c..56aad26edb 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -352,14 +352,17 @@ def parse_location(location: str) -> Tuple[str, str, str]: def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + from pyarrow.fs import S3FileSystem, resolve_s3_region + + if netloc: + netloc = resolve_s3_region(netloc) client_kwargs: Dict[str, Any] = { "endpoint_override": self.properties.get(S3_ENDPOINT), "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + "region": netloc or get_first_property_value(self.properties, S3_REGION, AWS_REGION), } if proxy_uri := self.properties.get(S3_PROXY_URI): From b53e89f72f739767f564e17578199200a17384ed Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Fri, 20 Dec 2024 01:33:36 +0000 Subject: [PATCH 02/20] Fix unit test for s3 fileystem --- pyiceberg/io/pyarrow.py | 8 ++++++-- tests/io/test_pyarrow.py | 6 ++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 56aad26edb..bc2c0f2051 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -354,15 +354,19 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if scheme in {"s3", "s3a", "s3n", "oss"}: from pyarrow.fs import S3FileSystem, resolve_s3_region + bucket_region = None if netloc: - netloc = resolve_s3_region(netloc) + try: + bucket_region = resolve_s3_region(netloc) + except OSError: + pass client_kwargs: Dict[str, Any] = { "endpoint_override": self.properties.get(S3_ENDPOINT), "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": netloc or get_first_property_value(self.properties, S3_REGION, AWS_REGION), + "region": bucket_region or get_first_property_value(self.properties, S3_REGION, AWS_REGION), } if proxy_uri := self.properties.get(S3_PROXY_URI): diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e4017e1df5..58c9fe8469 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -360,10 +360,11 @@ def test_pyarrow_s3_session_properties() -> None: **UNIFIED_AWS_SESSION_PROPERTIES, } - with patch("pyarrow.fs.S3FileSystem") as mock_s3fs: + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: s3_fileio = PyArrowFileIO(properties=session_properties) filename = str(uuid.uuid4()) + mock_s3_region_resolver.return_value = "us-east-1" s3_fileio.new_input(location=f"s3://warehouse/{filename}") mock_s3fs.assert_called_with( @@ -381,10 +382,11 @@ def test_pyarrow_unified_session_properties() -> None: **UNIFIED_AWS_SESSION_PROPERTIES, } - with patch("pyarrow.fs.S3FileSystem") as mock_s3fs: + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: s3_fileio = PyArrowFileIO(properties=session_properties) filename = str(uuid.uuid4()) + mock_s3_region_resolver.return_value = None s3_fileio.new_input(location=f"s3://warehouse/{filename}") mock_s3fs.assert_called_with( From eb5e491ce7c9b2f859c6af1b368f17f7af6c21f8 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sun, 22 Dec 2024 09:43:29 +1300 Subject: [PATCH 03/20] Update ArrowScan to use different FileSystem per file --- pyiceberg/io/pyarrow.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e701dd8717..02adda010f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1333,13 +1333,14 @@ def _task_to_table( return None -def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: +def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: deletes_per_file: Dict[str, List[ChunkedArray]] = {} unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) if len(unique_deletes) > 0: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( - lambda args: _read_deletes(*args), [(fs, delete) for delete in unique_deletes] + lambda args: _read_deletes(*args), + [(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1373,7 +1374,6 @@ def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem: class ArrowScan: _table_metadata: TableMetadata _io: FileIO - _fs: FileSystem _projected_schema: Schema _bound_row_filter: BooleanExpression _case_sensitive: bool @@ -1383,7 +1383,6 @@ class ArrowScan: Attributes: _table_metadata: Current table metadata of the Iceberg table _io: PyIceberg FileIO implementation from which to fetch the io properties - _fs: PyArrow FileSystem to use to read the files _projected_schema: Iceberg Schema to project onto the data files _bound_row_filter: Schema bound row expression to filter the data with _case_sensitive: Case sensitivity when looking up column names @@ -1401,7 +1400,6 @@ def __init__( ) -> None: self._table_metadata = table_metadata self._io = io - self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file self._projected_schema = projected_schema self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) self._case_sensitive = case_sensitive @@ -1441,7 +1439,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._fs, tasks) + deletes_per_file = _read_all_delete_files(self._io, tasks) executor = ExecutorFactory.get_or_create() def _table_from_scan_task(task: FileScanTask) -> pa.Table: @@ -1504,7 +1502,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._fs, tasks) + deletes_per_file = _read_all_delete_files(self._io, tasks) return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) def _record_batches_from_scan_tasks_and_deletes( @@ -1515,7 +1513,7 @@ def _record_batches_from_scan_tasks_and_deletes( if self._limit is not None and total_row_count >= self._limit: break batches = _task_to_record_batches( - self._fs, + _fs_from_file_path(task.file.file_path, self._io), task, self._bound_row_filter, self._projected_schema, From 0c61ac8f828e11e87b64fd51406252ca678cdf9d Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sun, 22 Dec 2024 11:12:31 +1300 Subject: [PATCH 04/20] Add unit test for `PyArrorFileIO.fs_by_scheme` cache behavior --- pyiceberg/io/pyarrow.py | 15 +++++++-------- tests/io/test_pyarrow.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 02adda010f..605485a873 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -354,19 +354,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if scheme in {"s3", "s3a", "s3n", "oss"}: from pyarrow.fs import S3FileSystem, resolve_s3_region - bucket_region = None - if netloc: - try: - bucket_region = resolve_s3_region(netloc) - except OSError: - pass - client_kwargs: Dict[str, Any] = { "endpoint_override": self.properties.get(S3_ENDPOINT), "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": bucket_region or get_first_property_value(self.properties, S3_REGION, AWS_REGION), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), } if proxy_uri := self.properties.get(S3_PROXY_URI): @@ -384,6 +377,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + # Override the default s3.region if netloc(bucket) resolves to a different region + try: + client_kwargs["region"] = resolve_s3_region(netloc) + except OSError: + pass + return S3FileSystem(**client_kwargs) elif scheme in ("hdfs", "viewfs"): from pyarrow.fs import HadoopFileSystem diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 58c9fe8469..9f83eea339 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -386,7 +386,7 @@ def test_pyarrow_unified_session_properties() -> None: s3_fileio = PyArrowFileIO(properties=session_properties) filename = str(uuid.uuid4()) - mock_s3_region_resolver.return_value = None + mock_s3_region_resolver.return_value = "client.region" s3_fileio.new_input(location=f"s3://warehouse/{filename}") mock_s3fs.assert_called_with( @@ -2076,3 +2076,34 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception( _to_requested_schema(requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False) assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value) + + +def test_pyarrow_file_io_fs_by_scheme_cache() -> None: + pyarrow_file_io = PyArrowFileIO() + us_east_1_region = "us-eas1-1" + ap_southeast_2_region = "ap-southeast-2" + + with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: + # Call with new argument resolves region automatically + mock_s3_region_resolver.return_value = us_east_1_region + filesystem_us = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket") + assert filesystem_us.region == us_east_1_region + assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 1 # type: ignore + assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 1 # type: ignore + + # Call with different argument also resolves region automatically + mock_s3_region_resolver.return_value = ap_southeast_2_region + filesystem_ap_southeast_2 = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket") + assert filesystem_ap_southeast_2.region == ap_southeast_2_region + assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 2 # type: ignore + assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 2 # type: ignore + + # Call with same argument hits cache + filesystem_us_cached = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket") + assert filesystem_us_cached.region == us_east_1_region + assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 1 # type: ignore + + # Call with same argument hits cache + filesystem_ap_southeast_2_cached = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket") + assert filesystem_ap_southeast_2_cached.region == ap_southeast_2_region + assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 2 # type: ignore From 327dbac75d10e817c7c0d83d8d22580c5ce9a3a3 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Mon, 23 Dec 2024 12:56:31 +1300 Subject: [PATCH 05/20] Add error handling --- pyiceberg/io/pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 605485a873..d2432e80e7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -380,7 +380,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste # Override the default s3.region if netloc(bucket) resolves to a different region try: client_kwargs["region"] = resolve_s3_region(netloc) - except OSError: + except (OSError, TypeError): pass return S3FileSystem(**client_kwargs) From b4fccf29a9a9be4107ccfab7bb602a2b82a16a62 Mon Sep 17 00:00:00 2001 From: Jiakai Li <50531391+jiakai-li@users.noreply.github.com> Date: Tue, 24 Dec 2024 10:03:39 +1300 Subject: [PATCH 06/20] Update tests/io/test_pyarrow.py Co-authored-by: Kevin Liu --- tests/io/test_pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 9f83eea339..05417f1490 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2080,7 +2080,7 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception( def test_pyarrow_file_io_fs_by_scheme_cache() -> None: pyarrow_file_io = PyArrowFileIO() - us_east_1_region = "us-eas1-1" + us_east_1_region = "us-east-1" ap_southeast_2_region = "ap-southeast-2" with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: From 48bb8111ab1484532128e985d7de94f4bf0df331 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 10:32:14 +1300 Subject: [PATCH 07/20] Update `s3.region` document and a test case --- mkdocs/docs/configuration.md | 30 +++++++++++++++--------------- tests/io/test_pyarrow.py | 3 ++- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 621b313613..8c3c3d0a57 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -102,21 +102,21 @@ For the FileIO there are several configuration options available: -| Key | Example | Description | -|----------------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | -| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | -| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | -| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | -| s3.role-session-name | session | An optional identifier for the assumed role session. | -| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. | -| s3.signer | bearer | Configure the signature version of the FileIO. | -| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | -| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | -| s3.region | us-west-2 | Sets the region of the bucket | -| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | -| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | -| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | +| Key | Example | Description | +|----------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | +| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | +| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | +| s3.role-session-name | session | An optional identifier for the assumed role session. | +| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. | +| s3.signer | bearer | Configure the signature version of the FileIO. | +| s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | +| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | +| s3.region | us-west-2 | Configure the default region used to instantiate an s3 filesystem. This setting will be overwritten if the bucket actually used resolves to a different region. | +| s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | +| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | +| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 05417f1490..e960374f72 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -364,7 +364,8 @@ def test_pyarrow_s3_session_properties() -> None: s3_fileio = PyArrowFileIO(properties=session_properties) filename = str(uuid.uuid4()) - mock_s3_region_resolver.return_value = "us-east-1" + # Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region + mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found") s3_fileio.new_input(location=f"s3://warehouse/{filename}") mock_s3fs.assert_called_with( From 8404e6b9c36daf1a71316efededfe4d99bfe08d2 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 14:51:48 +1300 Subject: [PATCH 08/20] Add test case for `PyArrowFileIO.new_input` multi region --- tests/io/test_pyarrow.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e960374f72..2c863611c3 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2108,3 +2108,28 @@ def test_pyarrow_file_io_fs_by_scheme_cache() -> None: filesystem_ap_southeast_2_cached = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket") assert filesystem_ap_southeast_2_cached.region == ap_southeast_2_region assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 2 # type: ignore + + +def test_pyarrow_io_new_input_multi_region() -> None: + bucket_regions = [ + ("us-east-2-bucket", "us-east-2"), + ("ap-southeast-2-bucket", "ap-southeast-2"), + ] + + def _s3_region_map(bucket: str) -> str: + for bucket_region in bucket_regions: + if bucket_region[0] == bucket: + return bucket_region[1] + raise OSError("Unknown bucket") + + # For one single pyarrow io instance with configured default s3 region + pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-2"}) + with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: + mock_s3_region_resolver.side_effect = _s3_region_map + + # The filesystem region is set by provided property by default (when bucket region cannot be resolved) + assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-2" + + # The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one) + for bucket_region in bucket_regions: + assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1] From 53951f544d371cea44b5f30d0d20180da1f0e903 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 14:59:59 +1300 Subject: [PATCH 09/20] Shuffle code location for better maintainability --- pyiceberg/io/pyarrow.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d2432e80e7..f554f35369 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), } + # Override the default s3.region if netloc(bucket) resolves to a different region + try: + client_kwargs["region"] = resolve_s3_region(netloc) + except (OSError, TypeError): + pass + if proxy_uri := self.properties.get(S3_PROXY_URI): client_kwargs["proxy_options"] = proxy_uri @@ -377,12 +383,6 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) - # Override the default s3.region if netloc(bucket) resolves to a different region - try: - client_kwargs["region"] = resolve_s3_region(netloc) - except (OSError, TypeError): - pass - return S3FileSystem(**client_kwargs) elif scheme in ("hdfs", "viewfs"): from pyarrow.fs import HadoopFileSystem From 51fb6ff02d992a80d938a43f1238be2d90d45679 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 15:08:14 +1300 Subject: [PATCH 10/20] Comment for future integration test --- tests/io/test_pyarrow.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2c863611c3..d84462b67e 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2080,6 +2080,9 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception( def test_pyarrow_file_io_fs_by_scheme_cache() -> None: + # It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region` + # Refer to: https://github.com/apache/arrow/issues/43713 + pyarrow_file_io = PyArrowFileIO() us_east_1_region = "us-east-1" ap_southeast_2_region = "ap-southeast-2" @@ -2111,6 +2114,9 @@ def test_pyarrow_file_io_fs_by_scheme_cache() -> None: def test_pyarrow_io_new_input_multi_region() -> None: + # It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region` + # Refer to: https://github.com/apache/arrow/issues/43713 + bucket_regions = [ ("us-east-2-bucket", "us-east-2"), ("ap-southeast-2-bucket", "ap-southeast-2"), From 0cd06c439a17029f51e1f927423197af63a24506 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 15:41:46 +1300 Subject: [PATCH 11/20] Typo fix --- tests/io/test_pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index d84462b67e..2e98160d66 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2129,12 +2129,12 @@ def _s3_region_map(bucket: str) -> str: raise OSError("Unknown bucket") # For one single pyarrow io instance with configured default s3 region - pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-2"}) + pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"}) with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: mock_s3_region_resolver.side_effect = _s3_region_map # The filesystem region is set by provided property by default (when bucket region cannot be resolved) - assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-2" + assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-1" # The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one) for bucket_region in bucket_regions: From 64fbdabf8f1455b8fa0f449254abd25cd9b77270 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Tue, 24 Dec 2024 15:51:55 +1300 Subject: [PATCH 12/20] Document wording --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 8c3c3d0a57..e6afa9fb32 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -113,7 +113,7 @@ For the FileIO there are several configuration options available: | s3.signer | bearer | Configure the signature version of the FileIO. | | s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | | s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | -| s3.region | us-west-2 | Configure the default region used to instantiate an s3 filesystem. This setting will be overwritten if the bucket actually used resolves to a different region. | +| s3.region | us-west-2 | Configure the default region used to initialize an S3FileSystem. This setting will be overwritten if the bucket actually used resolves to a different region. | | s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | | s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | From 37d9ec2f4cf3abee220085da7067c68a58e80a84 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Mon, 30 Dec 2024 08:25:15 +1300 Subject: [PATCH 13/20] Add warning when the bucket region for a file cannot be resolved (for `pyarrow.S3FileSystem`) --- pyiceberg/io/pyarrow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f554f35369..8b9b0898d7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -366,6 +366,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste try: client_kwargs["region"] = resolve_s3_region(netloc) except (OSError, TypeError): + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs["region"]}") pass if proxy_uri := self.properties.get(S3_PROXY_URI): From 4ff4a7d769f6017dac8ae56360b6a66a048f8768 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sun, 29 Dec 2024 20:36:38 +0000 Subject: [PATCH 14/20] Fix code linting --- pyiceberg/io/pyarrow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8b9b0898d7..cd4357f5a7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -366,8 +366,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste try: client_kwargs["region"] = resolve_s3_region(netloc) except (OSError, TypeError): - logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs["region"]}") - pass + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs['region']}") if proxy_uri := self.properties.get(S3_PROXY_URI): client_kwargs["proxy_options"] = proxy_uri From b56f2ee988e68be95fd4cc40c651dce437aed284 Mon Sep 17 00:00:00 2001 From: Jiakai Li <50531391+jiakai-li@users.noreply.github.com> Date: Sat, 4 Jan 2025 12:56:34 +1300 Subject: [PATCH 15/20] Update mkdocs/docs/configuration.md Co-authored-by: Kevin Liu --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index e6afa9fb32..06eaac1bed 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -113,7 +113,7 @@ For the FileIO there are several configuration options available: | s3.signer | bearer | Configure the signature version of the FileIO. | | s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | | s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | -| s3.region | us-west-2 | Configure the default region used to initialize an S3FileSystem. This setting will be overwritten if the bucket actually used resolves to a different region. | +| s3.region | us-west-2 | Configure the default region used to initialize an `S3FileSystem`. `PyArrowFileIO` attempts to automatically resolve the region for each S3 bucket, falling back to this value if resolution fails. | | s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | | s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | From 9cc3a3092fa7f3915c5d64087adad00e3b368e3f Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sat, 4 Jan 2025 00:57:30 +0000 Subject: [PATCH 16/20] Code refactoring --- pyiceberg/io/pyarrow.py | 209 +++++++++++++++++++++++++--------------- 1 file changed, 133 insertions(+), 76 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0310c90343..a49e8e4065 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -190,13 +190,6 @@ T = TypeVar("T") -class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): - def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile: - # In LocalFileSystem, parent directories must be first created before opening an output stream - self.create_dir(os.path.dirname(path), recursive=True) - return super().open_output_stream(path, *args, **kwargs) - - class PyArrowFile(InputFile, OutputFile): """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances. @@ -351,83 +344,147 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem, resolve_s3_region - - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } - - # Override the default s3.region if netloc(bucket) resolves to a different region - try: - client_kwargs["region"] = resolve_s3_region(netloc) - except (OSError, TypeError): - logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs['region']}") - - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn - - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name - - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) - - return S3FileSystem(**client_kwargs) elif scheme in ("hdfs", "viewfs"): - from pyarrow.fs import HadoopFileSystem - - hdfs_kwargs: Dict[str, Any] = {} - if netloc: - return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket - - return HadoopFileSystem(**hdfs_kwargs) + return self._initialize_hdfs_fs(scheme, netloc) + elif scheme in {"gs", "gcs"}: - from pyarrow.fs import GcsFileSystem - - gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location - if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): - if self.properties.get(GCS_ENDPOINT): - deprecation_message( - deprecated_in="0.8.0", - removed_in="0.9.0", - help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", - ) - url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc + return self._initialize_gcs_fs(scheme, netloc) + + elif scheme in {"file"}: + return self._initialize_local_fs(scheme, netloc) - return GcsFileSystem(**gcs_kwargs) - elif scheme == "file": - return PyArrowLocalFileSystem() else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + } + + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem, resolve_s3_region + + # Resolve region from netloc(bucket), fallback to user-provided region + provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + try: + bucket_region = resolve_s3_region(netloc) + except (OSError, TypeError): + bucket_region = None + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}") + + bucket_region = bucket_region or provided_region + if bucket_region != provided_region: + logger.warning( + f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " + f"provided region {provided_region}, actual region {bucket_region}" + ) + + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": bucket_region, + } + + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import HadoopFileSystem + + hdfs_kwargs: Dict[str, Any] = {} + if netloc: + return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") + if host := self.properties.get(HDFS_HOST): + hdfs_kwargs["host"] = host + if port := self.properties.get(HDFS_PORT): + # port should be an integer type + hdfs_kwargs["port"] = int(port) + if user := self.properties.get(HDFS_USER): + hdfs_kwargs["user"] = user + if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): + hdfs_kwargs["kerb_ticket"] = kerb_ticket + + return HadoopFileSystem(**hdfs_kwargs) + + def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import GcsFileSystem + + gcs_kwargs: Dict[str, Any] = {} + if access_token := self.properties.get(GCS_TOKEN): + gcs_kwargs["access_token"] = access_token + if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): + gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) + if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): + gcs_kwargs["default_bucket_location"] = bucket_location + if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): + if self.properties.get(GCS_ENDPOINT): + deprecation_message( + deprecated_in="0.8.0", + removed_in="0.9.0", + help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", + ) + url_parts = urlparse(endpoint) + gcs_kwargs["scheme"] = url_parts.scheme + gcs_kwargs["endpoint_override"] = url_parts.netloc + + return GcsFileSystem(**gcs_kwargs) + + def _initialize_local_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): + def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile: + # In LocalFileSystem, parent directories must be first created before opening an output stream + self.create_dir(os.path.dirname(path), recursive=True) + return super().open_output_stream(path, *args, **kwargs) + + return PyArrowLocalFileSystem() + def new_input(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location. From ba5ef76f253800b5a928a69290ea191eeeea044a Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sat, 4 Jan 2025 03:22:32 +0000 Subject: [PATCH 17/20] Unit test --- tests/io/test_pyarrow.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2e98160d66..16737c1f2f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -27,7 +27,7 @@ import pyarrow as pa import pyarrow.parquet as pq import pytest -from pyarrow.fs import FileType, LocalFileSystem +from pyarrow.fs import FileType, LocalFileSystem, S3FileSystem from pyiceberg.exceptions import ResolveError from pyiceberg.expressions import ( @@ -2139,3 +2139,16 @@ def _s3_region_map(bucket: str) -> str: # The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one) for bucket_region in bucket_regions: assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1] + + +def test_pyarrow_io_multi_fs() -> None: + pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"}) + + with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: + mock_s3_region_resolver.return_value = None + + # The PyArrowFileIO instance resolves s3 file input to S3FileSystem + assert isinstance(pyarrow_file_io.new_input("s3://bucket/path/to/file")._filesystem, S3FileSystem) + + # Same PyArrowFileIO instance resolves local file input to LocalFileSystem + assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem) From 8f06a15ca8b9b49fcef4b285fea1dd9556ac2963 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sat, 4 Jan 2025 03:46:46 +0000 Subject: [PATCH 18/20] Code refactoring --- pyiceberg/io/pyarrow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a49e8e4065..0310dd726e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1396,7 +1396,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( lambda args: _read_deletes(*args), - [(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes], + [(_fs_from_file_path(io, delete_file.file_path), delete_file) for delete_file in unique_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1408,7 +1408,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st return deletes_per_file -def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem: +def _fs_from_file_path(io: FileIO, file_path: str) -> FileSystem: scheme, netloc, _ = _parse_location(file_path) if isinstance(io, PyArrowFileIO): return io.fs_by_scheme(scheme, netloc) @@ -1569,7 +1569,7 @@ def _record_batches_from_scan_tasks_and_deletes( if self._limit is not None and total_row_count >= self._limit: break batches = _task_to_record_batches( - _fs_from_file_path(task.file.file_path, self._io), + _fs_from_file_path(self._io, task.file.file_path), task, self._bound_row_filter, self._projected_schema, From e5cac0220903979ed447e48f4db275711ede7fb3 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sat, 4 Jan 2025 17:31:02 +0000 Subject: [PATCH 19/20] Test cases --- tests/integration/test_reads.py | 29 +++++++++++++++++++++++++++++ tests/io/test_pyarrow.py | 25 ++++++++++++++++--------- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 0279c2199a..aad1cd1106 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -19,6 +19,7 @@ import math import time import uuid +from pathlib import PosixPath from urllib.parse import urlparse import pyarrow as pa @@ -917,3 +918,31 @@ def test_table_scan_empty_table(catalog: Catalog) -> None: result_table = tbl.scan().to_arrow() assert len(result_table) == 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_read_from_s3_and_local_fs(catalog: Catalog, tmp_path: PosixPath) -> None: + identifier = "default.test_read_from_s3_and_local_fs" + schema = pa.schema([pa.field("colA", pa.string())]) + arrow_table = pa.Table.from_arrays([pa.array(["one"])], schema=schema) + + tmp_dir = tmp_path / "data" + tmp_dir.mkdir() + local_file = tmp_dir / "local_file.parquet" + + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + tbl = catalog.create_table(identifier, schema=schema) + + # Append table to s3 endpoint + tbl.append(arrow_table) + + # Append a local file + pq.write_table(arrow_table, local_file) + tbl.add_files([str(local_file)]) + + result_table = tbl.scan().to_arrow() + assert result_table["colA"].to_pylist() == ["one", "one"] diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 16737c1f2f..95044e787a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name - +import logging import os import tempfile import uuid @@ -2113,10 +2113,10 @@ def test_pyarrow_file_io_fs_by_scheme_cache() -> None: assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 2 # type: ignore -def test_pyarrow_io_new_input_multi_region() -> None: +def test_pyarrow_io_new_input_multi_region(caplog: Any) -> None: # It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region` # Refer to: https://github.com/apache/arrow/issues/43713 - + user_provided_region = "ap-southeast-1" bucket_regions = [ ("us-east-2-bucket", "us-east-2"), ("ap-southeast-2-bucket", "ap-southeast-2"), @@ -2128,17 +2128,24 @@ def _s3_region_map(bucket: str) -> str: return bucket_region[1] raise OSError("Unknown bucket") - # For one single pyarrow io instance with configured default s3 region - pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"}) + # For a pyarrow io instance with configured default s3 region + pyarrow_file_io = PyArrowFileIO({"s3.region": user_provided_region}) with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: mock_s3_region_resolver.side_effect = _s3_region_map - # The filesystem region is set by provided property by default (when bucket region cannot be resolved) - assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-1" + # The region is set to provided region if bucket region cannot be resolved + with caplog.at_level(logging.WARNING): + assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == user_provided_region + assert f"Unable to resolve region for bucket non-exist-bucket, using default region {user_provided_region}" in caplog.text - # The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one) for bucket_region in bucket_regions: - assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1] + # For s3 scheme, region is overwritten by resolved bucket region if different from user provided region + with caplog.at_level(logging.WARNING): + assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1] + assert f"PyArrow FileIO overriding S3 bucket region for bucket {bucket_region[0]}: provided region {user_provided_region}, actual region {bucket_region[1]}" + + # For oss scheme, user provided region is used instead + assert pyarrow_file_io.new_input(f"oss://{bucket_region[0]}/path/to/file")._filesystem.region == user_provided_region def test_pyarrow_io_multi_fs() -> None: From 9652baf0b7d9208db9e84585da5c68c64c5d2072 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Sat, 4 Jan 2025 18:07:08 +0000 Subject: [PATCH 20/20] Code format --- tests/io/test_pyarrow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 95044e787a..a6cc4434f0 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2142,7 +2142,10 @@ def _s3_region_map(bucket: str) -> str: # For s3 scheme, region is overwritten by resolved bucket region if different from user provided region with caplog.at_level(logging.WARNING): assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1] - assert f"PyArrow FileIO overriding S3 bucket region for bucket {bucket_region[0]}: provided region {user_provided_region}, actual region {bucket_region[1]}" + assert ( + f"PyArrow FileIO overriding S3 bucket region for bucket {bucket_region[0]}: " + f"provided region {user_provided_region}, actual region {bucket_region[1]}" in caplog.text + ) # For oss scheme, user provided region is used instead assert pyarrow_file_io.new_input(f"oss://{bucket_region[0]}/path/to/file")._filesystem.region == user_provided_region