Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust): properly read/write fixed-sized lists from/to parquet files #16747

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-arrow/src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl FixedSizeListArray {
}

/// Alias to `Self::try_new(...).unwrap()`
#[track_caller]
pub fn new(data_type: ArrowDataType, values: Box<dyn Array>, validity: Option<Bitmap>) -> Self {
Self::try_new(data_type, values, validity).unwrap()
}
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-arrow/src/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ impl<O: Offset> Offsets<O> {
}
}

/// Returns a `length` corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
#[inline]
pub fn length_at(&self, index: usize) -> usize {
let (start, end) = self.start_end(index);
end - start
}

/// Returns a range (start, end) corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
Expand Down Expand Up @@ -434,6 +443,15 @@ impl<O: Offset> OffsetsBuffer<O> {
}
}

/// Returns a `length` corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
#[inline]
pub fn length_at(&self, index: usize) -> usize {
let (start, end) = self.start_end(index);
end - start
}

/// Returns a range (start, end) corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub async fn fetch_metadata(
file_byte_length
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
.ok_or_else(|| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"not enough bytes to contain parquet footer".to_string(),
)
})?..file_byte_length,
Expand All @@ -136,13 +136,13 @@ pub async fn fetch_metadata(
let magic = read_n(reader).unwrap();
debug_assert!(reader.is_empty());
if magic != polars_parquet::parquet::PARQUET_MAGIC {
return Err(polars_parquet::parquet::error::Error::OutOfSpec(
return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
"incorrect magic in parquet footer".to_string(),
)
.into());
}
footer_byte_size.try_into().map_err(|_| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"negative footer byte length".to_string(),
)
})?
Expand All @@ -154,7 +154,7 @@ pub async fn fetch_metadata(
file_byte_length
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
.ok_or_else(|| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"not enough bytes to contain parquet footer".to_string(),
)
})?..file_byte_length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, Optio
use super::utils::*;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage};
use crate::read::deserialize::utils::{page_is_filtered, page_is_optional};
use crate::read::ParquetError;

pub(crate) type BinaryDict = BinaryArray<i64>;

Expand All @@ -20,7 +20,7 @@ pub(crate) struct Required<'a> {

impl<'a> Required<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BinaryIter::new(values).take(page.num_values());

Ok(Self { values })
Expand All @@ -39,15 +39,15 @@ pub(crate) struct Delta<'a> {

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x.map(|x| x as usize))
.collect::<Result<Vec<_>, ParquetError>>()?;
.collect::<ParquetResult<Vec<_>>>()?;

let values = lengths_iter.into_values();
Ok(Self {
Expand Down Expand Up @@ -88,7 +88,7 @@ pub(crate) struct DeltaBytes<'a> {

impl<'a> DeltaBytes<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let mut decoder = delta_bitpacked::Decoder::try_new(values)?;
let prefix = (&mut decoder)
.take(page.num_values())
Expand Down Expand Up @@ -329,7 +329,7 @@ pub(crate) fn build_binary_state<'a>(
))
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Expand All @@ -343,7 +343,7 @@ pub(crate) fn build_binary_state<'a>(
Ok(BinaryState::FilteredRequired(FilteredRequired::new(page)))
},
(Encoding::Plain, _, true, true) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(BinaryState::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
Expand Down Expand Up @@ -409,14 +409,14 @@ pub(crate) fn build_nested_state<'a>(
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::OptionalDictionary)
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Optional(values))
},
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct Values<'a>(BitmapIter<'a>);

impl<'a> Values<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(Self(BitmapIter::new(values, 0, values.len() * 8)))
}
Expand Down Expand Up @@ -54,7 +54,7 @@ struct FilteredRequired<'a> {

impl<'a> FilteredRequired<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
// todo: replace this by an iterator over slices, for faster deserialization
let values = BitmapIter::new(values, 0, page.num_values());

Expand Down Expand Up @@ -138,7 +138,7 @@ impl<'a> Decoder<'a> for BooleanDecoder {
},
(Encoding::Rle, true, false) => {
let optional = OptionalPageValidity::try_new(page)?;
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
// For boolean values the length is pre-pended.
let (_len_in_bytes, values) = values.split_at(4);
let iter = hybrid_rle::Decoder::new(values, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(values))
},
(Encoding::Plain, false, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Required(values))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(super) struct Optional<'a> {

impl<'a> Optional<'a> {
pub(super) fn try_new(page: &'a DataPage, size: usize) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = values.chunks_exact(size);

Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a> Decoder<'a> for BinaryDecoder {
FilteredRequired::new(page, self.size),
)),
(Encoding::Plain, _, true, true) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(State::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn create_list(
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
let (mut offsets, validity) = nested.nested.pop().unwrap().take();
match data_type.to_logical_type() {
ArrowDataType::List(_) => {
offsets.push(values.len() as i64);
Expand Down Expand Up @@ -89,7 +89,7 @@ pub fn create_map(
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
let (mut offsets, validity) = nested.nested.pop().unwrap().take();
match data_type.to_logical_type() {
ArrowDataType::Map(_, _) => {
offsets.push(values.len() as i64);
Expand Down
21 changes: 18 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ where
dict_read::<$K, _>(iter, init, type_, data_type, num_rows, chunk_size)
})?
},
ArrowDataType::List(inner)
| ArrowDataType::LargeList(inner)
| ArrowDataType::FixedSizeList(inner, _) => {
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
Expand All @@ -258,6 +256,23 @@ where
});
Box::new(iter) as _
},
ArrowDataType::FixedSizeList(inner, width) => {
init.push(InitNested::FixedSizeList(field.is_nullable, *width));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
num_rows,
chunk_size,
)?;
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_list(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
},
ArrowDataType::Decimal(_, _) => {
init.push(InitNested::Primitive(field.is_nullable));
let type_ = types.pop().unwrap();
Expand Down
Loading