Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Cannot use PyIceberg with multiple FS #1041

Closed
kevinjqliu opened this issue Aug 11, 2024 · 20 comments
Closed

[Bug] Cannot use PyIceberg with multiple FS #1041

kevinjqliu opened this issue Aug 11, 2024 · 20 comments
Assignees

Comments

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Aug 11, 2024

Apache Iceberg version

main (development)

Please describe the bug 🐞

PyIceberg assumes the same FS implementation is used for reading both metadata and data.
However, I want to use a catalog with local FS as the warehouse while referencing S3 files as data.

See this example Jupyter notebook to reproduce

Problem

The fs implementation is determined by metadata location, which is then passed down to the function which reads the data file.

scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
if isinstance(io, PyArrowFileIO):
fs = io.fs_by_scheme(scheme, netloc)

Possible solution

Determine fs implementation based on the file path of the current file

@Fokko
Copy link
Contributor

Fokko commented Aug 20, 2024

This is a good point, I've heard that folks store their metadata on HDFS, and the data itself on S3.

I don't think the example with the add-files is the best, it would be better to support the write.data.path and write.metadata.path configuration

@kevinjqliu
Copy link
Contributor Author

Oh interesting, thanks!
Here's the config definition for write.data.path and write.metadata.path
https://iceberg.apache.org/docs/latest/configuration/#write-properties

I also found an old issue referencing this #161

Another example usage is to use both local FS and S3(minio), which might be easier to set up and test against

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

I will have a look this issue.

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

A generic question: why we are implementing a custom

def _infer_file_io_from_scheme(path: str, properties: Properties) -> Optional[FileIO]:
parsed_url = urlparse(path)
if parsed_url.scheme:
if file_ios := SCHEMA_TO_FILE_IO.get(parsed_url.scheme):
for file_io_path in file_ios:
if file_io := _import_file_io(file_io_path, properties):
return file_io
else:
warnings.warn(f"No preferred file implementation for scheme: {parsed_url.scheme}")
return None

SCHEMA_TO_FILE_IO: Dict[str, List[str]] = {
"s3": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"viewfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
"abfss": [FSSPEC_FILE_IO],
}

instead of using fsspec.core.url_to_fs(file_path)[0] directly?

(On a side note: this looks a bit confusing to me, as why for gs, FSSPEC_FILE_IO is not added as a viable io method as we have added gcfs into extras.)

(Also I am not sure why pyarrow is not using fsspec as io layer but implement things on their own.)

EDIT:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.url_to_fs is a slightly better choice than fsspec.core.get_fs_token_path

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

Read my comment here for the cause of the issue.

I dont think fixing SqlCatalog alone is the proper answer to this bug. The io layer seems to me ill written and has to be fixed somewhere in the uppper level (e.g. FsspecInputFile or InputFile).

Let me know what do you think, then we can come up with a way to properly address this. 😄

@kevinjqliu
Copy link
Contributor Author

Thanks for taking a look at this @TiansuYu

why we are implementing a custom

I think custom scheme parsing avoids picking one library over another (fsspec vs pyarrow). fsspec.core.get_fs_token_paths seems like an interesting replacement when using fsspec.

(On a side note: this looks a bit confusing to me, as why for gs, FSSPEC_FILE_IO is not added as a viable io method as we have added gcfs into extras.)

Good catch, fsspec should be included since GCS is supported

def _gs(properties: Properties) -> AbstractFileSystem:
# https://gcsfs.readthedocs.io/en/latest/api.html#gcsfs.core.GCSFileSystem
from gcsfs import GCSFileSystem
return GCSFileSystem(
project=properties.get(GCS_PROJECT_ID),
access=properties.get(GCS_ACCESS, "full_control"),
token=properties.get(GCS_TOKEN),
consistency=properties.get(GCS_CONSISTENCY, "none"),
cache_timeout=properties.get(GCS_CACHE_TIMEOUT),
requester_pays=property_as_bool(properties, GCS_REQUESTER_PAYS, False),
session_kwargs=json.loads(properties.get(GCS_SESSION_KWARGS, "{}")),
endpoint_url=properties.get(GCS_ENDPOINT),
default_location=properties.get(GCS_DEFAULT_LOCATION),
version_aware=property_as_bool(properties, GCS_VERSION_AWARE, False),
)

(Also I am not sure why pyarrow is not using fsspec as io layer but implement things on their own.)

pyarrow is using its own native fs implementations

def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
if scheme in {"s3", "s3a", "s3n"}:
from pyarrow.fs import S3FileSystem
client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}
if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri
if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
client_kwargs["connect_timeout"] = float(connect_timeout)
return S3FileSystem(**client_kwargs)
elif scheme in ("hdfs", "viewfs"):
from pyarrow.fs import HadoopFileSystem
hdfs_kwargs: Dict[str, Any] = {}
if netloc:
return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
if host := self.properties.get(HDFS_HOST):
hdfs_kwargs["host"] = host
if port := self.properties.get(HDFS_PORT):
# port should be an integer type
hdfs_kwargs["port"] = int(port)
if user := self.properties.get(HDFS_USER):
hdfs_kwargs["user"] = user
if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
hdfs_kwargs["kerb_ticket"] = kerb_ticket
return HadoopFileSystem(**hdfs_kwargs)
elif scheme in {"gs", "gcs"}:
from pyarrow.fs import GcsFileSystem
gcs_kwargs: Dict[str, Any] = {}
if access_token := self.properties.get(GCS_TOKEN):
gcs_kwargs["access_token"] = access_token
if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
gcs_kwargs["default_bucket_location"] = bucket_location
if endpoint := self.properties.get(GCS_ENDPOINT):
url_parts = urlparse(endpoint)
gcs_kwargs["scheme"] = url_parts.scheme
gcs_kwargs["endpoint_override"] = url_parts.netloc
return GcsFileSystem(**gcs_kwargs)
elif scheme == "file":
return PyArrowLocalFileSystem()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

@kevinjqliu
Copy link
Contributor Author

I dont think fixing SqlCatalog alone is the proper answer to this bug. The io layer seems to me ill written and has to be fixed somewhere in the uppper level (e.g. FsspecInputFile or InputFile).

yea, the main issue is the assumption that the same io (and fs implementation) is used for reading both data and metadata files. The example you pointed to pass in the io parameter

data_files = _parquet_files_to_data_files(
table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
)

Instead, we would want to recreate io/fs based on the file currently being processed.

Here's another example of passing in the io parameter on the write path

data_files = _dataframe_to_data_files(
table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
)

@kevinjqliu
Copy link
Contributor Author

Generally, this problem should go away if we re-evaluate fs and io each time a file is read and written. Or other words, we should stop passing the io parameter around.

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

@kevinjqliu I think resolving fs at file level should make the API cleaner. We can e.g. if no file_system given to FsspecInputFile (or similarly PyarrowInputFile), then we resolve them at file level.

I would say one benefit one might want to set fs on table level is to reuse that fs instance for performance boost. If we want to keep this, I would say we need to make two io configs, one for metadata, one for data, on the MetastoreCatalog or Catalog level.

@kevinjqliu
Copy link
Contributor Author

My preference is resolving fs at the file level. It's more flexible and the performance difference should be negligible.
Another reason is to be able to write data across clouds. Technically, I can write to multiple clouds, across AWS, Azure, and GCP.

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

I will make a PR according to this:

My preference is resolving fs at the file level. It's more flexible and the performance difference should be negligible.
Another reason is to be able to write data across clouds. Technically, I can write to multiple clouds, across AWS, Azure, and GCP.

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

Also reading on here:
https://arrow.apache.org/docs/python/filesystems.html#using-arrow-filesystems-with-fsspec

There might be some opportunity that we can simplify the split between arrow and fsspec file_system.

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Sep 2, 2024

yep! There are definitely opportunities to consolidate the two. I opened #310 with some details.

@TiansuYu
Copy link
Contributor

TiansuYu commented Sep 2, 2024

Reading on table spec, I just realised that there is a field location in https://iceberg.apache.org/spec/#table-metadata-fields that specifies a base location of the table. Does Iceberg Java actually allows the split between metadata and data location?

@kevinjqliu
Copy link
Contributor Author

Its configurable via the write properties. See this comment #1041 (comment)

@Sairam90
Copy link

Sairam90 commented Dec 20, 2024

Any updates on this issue , I face a similar issue when creating a table on S3 as well
fyi - I think this will prevent from making pyarrow the default io system
FSSPEC_FILE_IO = "pyiceberg.io.fsspec.FsspecFileIO"
catalog = load_catalog(name='glue', **{'type': 'glue', 'py-io-impl': FSSPEC_FILE_IO})

@jiakai-li
Copy link
Contributor

Hey guys, I can pick this up together with #1279 if no one is currently working on this.

@kevinjqliu
Copy link
Contributor Author

assigned to you @jiakai-li

@jiakai-li
Copy link
Contributor

jiakai-li commented Jan 7, 2025

@kevinjqliu I guess we can close this issue and #1279 now? At the meantime, I'm keen to work on the write.data.path and write.metadata.path if that's something we want to enable and no one else is currently working on it?

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Jan 7, 2025

@jiakai-li we can close this issue! Fixed by #1453

At the meantime, I'm keen to work on the write.data.path and write.metadata.path if that's something we want to enable and no one else is currently working on it?

Let's open a new issue for those #1492

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants