Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read from multiple s3 regions #1453

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

jiakai-li
Copy link
Contributor

@jiakai-li
Copy link
Contributor Author

jiakai-li commented Dec 20, 2024

Hey @kevinjqliu , I hope you are ready for the Christmas time :-)

After some investigation, I noticed the PyArrowFileIO._initialize_fs function doesn't take netloc parameter into account when initialize the S3FileSystem. Instead, it always uses the region found in properties. I modified the precedence order to make netloc take higher priority compare to the region saved in properties.

Would be keen to get some quick feedback, and will add more unit test if this sounds a fix on the correct track? Thanks!

@jiakai-li jiakai-li marked this pull request as draft December 20, 2024 01:59
@kevinjqliu
Copy link
Contributor

@jiakai-li Thanks for working on this! And happy holidays :)

I noticed the PyArrowFileIO._initialize_fs function doesn't take netloc parameter into account when initialize the S3FileSystem

looking through the usage for _initialize_fs, it doesnt look like netloc is used at all.

it always uses the region found in properties

I think that's one of the problems we need to tackle. The current S3 configuration requires a specific "region" to be set. This assumes that all data and metadata files are from the same region as the one specified. But what if i have some files in one region and some in another?

I think a potential solution might be to omit the "region" property and allow the S3FileSystem to determine the proper region using resolve_s3_region. This is recommended in the S3FileSystem docs for region.

Another potential issue is the way we cache fs, it assumes that there's only one fs per scheme. With the region approach above, we break this assumption.

@kevinjqliu
Copy link
Contributor

BTW theres a similar issue in #1041

@jiakai-li
Copy link
Contributor Author

Thank you @kevinjqliu , just try to clear my head a little bit

I think a potential solution might be to omit the "region" property and allow the S3FileSystem to determine the proper region using resolve_s3_region. This is recommended in the S3FileSystem docs for region.

Is the change I made in accordance with this option? What I've done essentially is using the netloc to determine the bucket region. Only in case when, for some reason, the region cannot be determined then we fall back to the properties configuration.

Another potential issue is the way we cache fs, it assumes that there's only one fs per scheme. With the region approach above, we break this assumption.

Please correct me if I miss something for how the fs cache works. But here is my understanding:

I see we use lru_cache, so it should cache one fs for each different bucket since they will have different netloc and thus a different key in the cache. Previously, it looks like we only have one cached fs. It seems relates to the netloc not being used. As a result, netloc is not connected with the client_kwargs["region"] configuration. In this case, even two cache keys point to two fs instances, the two fs instances are still of the same region (the one configured in properties).

I think solving the netloc issue will also resolve the cache issue as the lru_cache key now links with the region and will return the correct instance.

@jiakai-li
Copy link
Contributor Author

BTW theres a similar issue in #1041

Can I tackle on this issue as well if there is no one working on it?

@kevinjqliu
Copy link
Contributor

Is the change I made in accordance with this option? What I've done essentially is using the netloc to determine the bucket region. Only in case when, for some reason, the region cannot be determined then we fall back to the properties configuration.

Im dont think netloc can be used to determine the region. S3 URI scheme doesn't use netloc, only S3 URL does.
For example, heres how fs_by_scheme is typically used

scheme, netloc, path = self.parse_location(location)
return PyArrowFile(
fs=self.fs_by_scheme(scheme, netloc),

and running an example S3 URI:

from pyiceberg.io.pyarrow import PyArrowFileIO
scheme, netloc, path = PyArrowFileIO.parse_location("s3://a/b/c/1.txt")
# returns ('s3', 'a', 'a/b/c/1.txt')

In order to support multiple regions, we might need to call resolve_s3_region first and pass the region to fs_by_scheme. If you look at it from S3FileSystem's perspective, we need a new S3FileSystem object per region. This relates to how the FileSystem is cached.

BTW a good test scenario can be a table where my metadata files are stored in one bucket while my data files are stored in another. We might be able to construct this test case by modifying the minio settings to create different regional buckets; I haven't tested this yet.

@kevinjqliu
Copy link
Contributor

Can I tackle on this issue as well if there is no one working on it?

I don't think anyone's working on it right now, feel free to pick it up.

@jiakai-li
Copy link
Contributor Author

jiakai-li commented Dec 20, 2024

Thank you @kevinjqliu , can I have some more guidance on this please?

Im dont think netloc can be used to determine the region. S3 URI scheme doesn't use netloc, only S3 URL does.

I did some search and seems in terms of s3 scheme, the format is s3://<bucket-name>/<key-name>. The netloc parsed from urlparse (essentially passed to the _initialize_fs call) then points to the bucket-name.

In the below example, I would expect 'a' to be netloc and also the bucket-name. Is there an exception that doesn't follow this format?

from pyiceberg.io.pyarrow import PyArrowFileIO
scheme, netloc, path = PyArrowFileIO.parse_location("s3://a/b/c/1.txt")
# returns ('s3', 'a', 'a/b/c/1.txt')

BTW a good test scenario can be a table where my metadata files are stored in one bucket while my data files are stored in another. We might be able to construct this test case by modifying the minio settings to create different regional buckets; I haven't tested this yet.

Yep, I tested the change using a similar scenario locally with my own handcrafted s3 files. But will add more proper test cases as I make more progress. Thanks again!

@kevinjqliu
Copy link
Contributor

I did some search and seems in terms of s3 scheme, the format is s3:///. The netloc parsed from urlparse (essentially passed to the _initialize_fs call) then points to the bucket-name.

ah yes, you're right. sorry for the confusion. I was thinking of something else.
Using netloc, we're essentially mapping S3 bucket to its region. And this is fine because each bucket should belong to a specific region; perhaps we can cache this.
This makes sense to me!

@kevinjqliu
Copy link
Contributor

BTW there are 2 FileIO implementations, one for pyarrow, another for fsspec.

We might want to do the same for fsspec

def _s3(properties: Properties) -> AbstractFileSystem:
from s3fs import S3FileSystem
client_kwargs = {
"endpoint_url": properties.get(S3_ENDPOINT),
"aws_access_key_id": get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"aws_secret_access_key": get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"aws_session_token": get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),

@jiakai-li
Copy link
Contributor Author

Sweet, I'll go ahead with this approach then. Thanks very much @kevinjqliu !

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Show resolved Hide resolved
pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
tests/io/test_pyarrow.py Show resolved Hide resolved
@jiakai-li
Copy link
Contributor Author

jiakai-li commented Dec 23, 2024

BTW there are 2 FileIO implementations, one for pyarrow, another for fsspec.

We might want to do the same for fsspec

def _s3(properties: Properties) -> AbstractFileSystem:
from s3fs import S3FileSystem
client_kwargs = {
"endpoint_url": properties.get(S3_ENDPOINT),
"aws_access_key_id": get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"aws_secret_access_key": get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"aws_session_token": get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),

Hi @kevinjqliu , for the above concern, I tested it locally and also did some investigation. According to what I found here seems fsspec doesn't have the same issue as pyarrow. So I guess we can leave it?

@kevinjqliu
Copy link
Contributor

According to what I found here seems fsspec doesn't have the same issue as pyarrow. So I guess we can leave it?

wow thats interesting, i didn't know about that. I like that solution :) Hopefully pyarrow fs will have this feature one day

Whenever a new bucket is used, it will first find out which region it belongs and then use the client for that region.

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more comments, thanks for working on this!

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Show resolved Hide resolved
tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
tests/io/test_pyarrow.py Show resolved Hide resolved
tests/io/test_pyarrow.py Show resolved Hide resolved
@jiakai-li
Copy link
Contributor Author

jiakai-li commented Dec 24, 2024

This PR is ready for review now. Thanks very much and merry christmas! Please let me know if any further change is required.

@jiakai-li jiakai-li marked this pull request as ready for review December 24, 2024 02:23
@@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}

# Override the default s3.region if netloc(bucket) resolves to a different region
try:
client_kwargs["region"] = resolve_s3_region(netloc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about doing this lookup only when the region is not provided explicitly? I think this will do another call to S3.

Copy link
Contributor Author

@jiakai-li jiakai-li Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Fokko, my understanding is that the problem occurs when the provided region doesn't match the data file bucket region, and that will fail the file read for pyarrow. And by overwriting the bucket region (fall back to provided region), we make sure the real bucket region that a data file is stored takes precedence. (this function is cached when using fs_by_scheme, so it will be called only for new bucket that's not resolved previously to save calls to S3)

try:
client_kwargs["region"] = resolve_s3_region(netloc)
except (OSError, TypeError):
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we emit a warning here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants