You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to pa.list_(pa.bool_(), 274).
On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards(), where we convert merged pyarrow table to pandas.
I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:
Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify reduce_pixel_shards() to use this argument, but it produces the same error.
Details are under the spoilers:
Reader implementation
importnumpyasnpimportpandasaspdimportpyarrowaspafromastropy.tableimportTablefromhipscat_import.catalog.file_readersimportFitsReaderclassHSCFitsReader(FitsReader):
"""FITS reader that converts ra and dec from radians to degrees"""def__init__(self, *args, ra_column, dec_column, **kwargs):
super().__init__(*args, **kwargs)
self.ra_column=ra_columnself.dec_column=dec_columndefread(self, input_file, read_columns=None):
# Mostly copy-pasted from the hipscat-import implementationself.regular_file_exists(input_file, **self.kwargs)
table=Table.read(input_file, memmap=True, **self.kwargs)
ifread_columns:
table.keep_columns(read_columns)
elifself.column_names:
table.keep_columns(self.column_names)
elifself.skip_column_names:
table.remove_columns(self.skip_column_names)
total_rows=len(table)
read_rows=0whileread_rows<total_rows:
chunk=table[read_rows : read_rows+self.chunksize]
df_chunk=self.astropy_table_to_df(chunk)
yielddf_chunkread_rows+=self.chunksizedefastropy_table_to_df(self, table):
"""Data convertion, spoils input table"""# Flags wouldn't convert to pandas due to astropy's implementation limitations# So we convert them manually and delete from the tableif'flags'intable.columns:
flags=table['flags']
flags_length=flags.shape[1]
flags_pyarrow=pa.array(flags.tolist(), type=pa.list_(pa.bool_(), flags_length))
table.remove_column('flags')
df=table.to_pandas()
# Convert coords from radians to degreesdf[self.ra_column] =np.degrees(df[self.ra_column])
df[self.dec_column] =np.degrees(df[self.dec_column])
# Assign flagsif'flags'intable.columns:
df['flags'] =pd.Series(flags_pyarrow, dtype=pd.ArrowDtype(flags_pyarrow.type))
returndf
Traceback
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5284, in pyarrow.lib.type_for_alias()
5283 try:
-> 5284 alias = _type_aliases[name]
5285 except KeyError:
KeyError: 'fixed_size_list<item: bool>[274]'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2251, in construct_from_string()
2250 try:
-> 2251 pa_dtype = pa.type_for_alias(base_type)
2252 except ValueError as err:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5286, in pyarrow.lib.type_for_alias()
5285 except KeyError:
-> 5286 raise ValueError('No type alias for {0}'.format(name))
5287
ValueError: No type alias for fixed_size_list<item: bool>[274]
The above exception was the direct cause of the following exception:
NotImplementedError Traceback (most recent call last)
Cell In[3], line 29
25 with Client(n_workers=64) as client:
26 # import dask
27 # with dask.config.set(scheduler='single-threaded'), Client(processes=False, threads_per_worker=1, n_workers=1) as client:
28 display(client)
---> 29 pipeline_with_client(args, client)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:60, in pipeline_with_client(args, client)
58 if args.completion_email_address:
59 _send_failure_email(args, exception)
---> 60 raise exception
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:44, in pipeline_with_client(args, client)
41 raise ValueError("args is required and should be subclass of RuntimeArguments")
43 if isinstance(args, ImportArguments):
---> 44 catalog_runner.run(args, client)
45 elif isinstance(args, IndexArguments):
46 index_runner.run(args, client)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/run_import.py:111, in run(args, client)
85 for (
86 destination_pixel,
87 source_pixel_count,
88 destination_pixel_key,
89 ) in args.resume_plan.get_reduce_items():
90 futures.append(
91 client.submit(
92 mr.reduce_pixel_shards,
(...)
108 )
109 )
--> 111 args.resume_plan.wait_for_reducing(futures)
113 # All done - write out the metadata
114 with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/resume_plan.py:298, in ResumePlan.wait_for_reducing(self, futures)
296 def wait_for_reducing(self, futures):
297 """Wait for reducing futures to complete."""
--> 298 self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
299 remaining_reduce_items = self.get_reduce_items()
300 if len(remaining_reduce_items) > 0:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline_resume_plan.py:143, in PipelineResumePlan.wait_for_futures(self, futures, stage_name, fail_fast)
141 some_error = True
142 if fail_fast:
--> 143 raise future.exception()
145 if some_error:
146 raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/distributed/worker.py:3000, in apply_function_simple()
2995 with (
2996 context_meter.meter("thread-noncpu", func=time) as m,
2997 context_meter.meter("thread-cpu", func=thread_time),
2998 ):
2999 try:
-> 3000 result = function(*args, **kwargs)
3001 except (SystemExit, KeyboardInterrupt):
3002 # Special-case these, just like asyncio does all over the place. They will
3003 # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
(...)
3006 # Any other `BaseException` types would ultimately be ignored by asyncio if
3007 # raised here, after messing up the worker state machine along their way.
3008 raise
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:331, in reduce_pixel_shards()
326 except Exception as exception: # pylint: disable=broad-exception-caught
327 print_task_failure(
328 f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
329 exception,
330 )
--> 331 raise exception
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
280 if rows_written != destination_pixel_size:
281 raise ValueError(
282 "Unexpected number of objects at pixel "
283 f"({healpix_pixel})."
284 f" Expected {destination_pixel_size}, wrote {rows_written}"
285 )
--> 287 dataframe = merged_table.to_pandas()
288 if sort_columns:
289 dataframe = dataframe.sort_values(sort_columns.split(","))
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/array.pxi:885, in pyarrow.lib._PandasConvertible.to_pandas()
883 coerce_temporal_nanoseconds=coerce_temporal_nanoseconds
884 )
--> 885 return self._to_pandas(options, categories=categories,
886 ignore_metadata=ignore_metadata,
887 types_mapper=types_mapper)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/table.pxi:5002, in pyarrow.lib.Table._to_pandas()
5000 types_mapper=None):
5001 from pyarrow.pandas_compat import table_to_dataframe
-> 5002 df = table_to_dataframe(
5003 options, self, categories,
5004 ignore_metadata=ignore_metadata,
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:774, in table_to_dataframe()
771 table = _add_any_metadata(table, pandas_metadata)
772 table, index = _reconstruct_index(table, index_descriptors,
773 all_columns, types_mapper)
--> 774 ext_columns_dtypes = _get_extension_dtypes(
775 table, all_columns, types_mapper)
776 else:
777 index = _pandas_api.pd.RangeIndex(table.num_rows)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:853, in _get_extension_dtypes()
848 dtype = col_meta['numpy_type']
850 if dtype not in _pandas_supported_numpy_types:
851 # pandas_dtype is expensive, so avoid doing this for types
852 # that are certainly numpy dtypes
--> 853 pandas_dtype = _pandas_api.pandas_dtype(dtype)
854 if isinstance(pandas_dtype, _pandas_api.extension_dtype):
855 if hasattr(pandas_dtype, "__from_arrow__"):
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:148, in pyarrow.lib._PandasAPIShim.pandas_dtype()
146 return self._pd.lib.infer_dtype(obj)
147
--> 148 cpdef pandas_dtype(self, dtype):
149 self._check_import()
150 try:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:151, in pyarrow.lib._PandasAPIShim.pandas_dtype()
149 self._check_import()
150 try:
--> 151 return self._types_api.pandas_dtype(dtype)
152 except AttributeError:
153 return None
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/common.py:1624, in pandas_dtype()
1621 return dtype
1623 # registered extension types
-> 1624 result = registry.find(dtype)
1625 if result is not None:
1626 if isinstance(result, type):
1627 # GH 31356, GH 54592
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/base.py:576, in find()
574 for dtype_type in self.dtypes:
575 try:
--> 576 return dtype_type.construct_from_string(dtype)
577 except TypeError:
578 pass
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2262, in construct_from_string()
2258 except (NotImplementedError, ValueError):
2259 # Fall through to raise with nice exception message below
2260 pass
-> 2262 raise NotImplementedError(
2263 "Passing pyarrow type specific parameters "
2264 f"({has_parameters.group()}) in the string is not supported. "
2265 "Please construct an ArrowDtype object with a pyarrow_dtype "
2266 "instance with specific parameters."
2267 ) from err
2268 raise TypeError(f"'{base_type}' is not a valid pyarrow data type.") from err
2269 return cls(pa_dtype)
NotImplementedError: Passing pyarrow type specific parameters ([274]) in the string is not supported. Please construct an ArrowDtype object with a pyarrow_dtype instance with specific parameters.
Before submitting
Please check the following:
I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead.
If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.
The text was updated successfully, but these errors were encountered:
Bug report
I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to
pa.list_(pa.bool_(), 274)
.On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from
hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
, where we convert merged pyarrow table to pandas.I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:
Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify
reduce_pixel_shards()
to use this argument, but it produces the same error.Details are under the spoilers:
Reader implementation
Traceback
Before submitting
Please check the following:
The text was updated successfully, but these errors were encountered: