From 5996d1ec0bf83499736d36ce5a34cb119c8afacb Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Wed, 8 May 2024 18:29:57 +1000 Subject: [PATCH] Revert "feat(rust): Add RLE to `RLE_DICTIONARY` encoder" (#16113) --- .../src/compute/cast/binary_to.rs | 1 - .../src/compute/cast/binview_to.rs | 2 - .../src/compute/cast/primitive_to.rs | 1 - .../polars-arrow/src/compute/cast/utf8_to.rs | 1 - crates/polars-io/src/parquet/write/writer.rs | 2 +- .../src/arrow/write/dictionary.rs | 41 ++- crates/polars-parquet/src/arrow/write/mod.rs | 2 +- .../src/arrow/write/nested/mod.rs | 10 +- .../polars-parquet/src/arrow/write/utils.rs | 6 +- .../parquet/encoding/hybrid_rle/encoder.rs | 289 ++++++------------ .../src/parquet/encoding/hybrid_rle/mod.rs | 4 +- .../tests/it/io/parquet/write/binary.rs | 4 +- .../tests/it/io/parquet/write/primitive.rs | 4 +- py-polars/tests/unit/io/test_parquet.py | 43 --- 14 files changed, 136 insertions(+), 274 deletions(-) diff --git a/crates/polars-arrow/src/compute/cast/binary_to.rs b/crates/polars-arrow/src/compute/cast/binary_to.rs index d5e8bfb30852..c7970fe6a051 100644 --- a/crates/polars-arrow/src/compute/cast/binary_to.rs +++ b/crates/polars-arrow/src/compute/cast/binary_to.rs @@ -139,7 +139,6 @@ pub fn binary_to_dictionary( from: &BinaryArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/binview_to.rs b/crates/polars-arrow/src/compute/cast/binview_to.rs index 1c157110ec49..8c7ef4c2453a 100644 --- a/crates/polars-arrow/src/compute/cast/binview_to.rs +++ b/crates/polars-arrow/src/compute/cast/binview_to.rs @@ -21,7 +21,6 @@ pub(super) fn binview_to_dictionary( from: &BinaryViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) @@ -31,7 +30,6 @@ pub(super) fn utf8view_to_dictionary( from: &Utf8ViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/primitive_to.rs b/crates/polars-arrow/src/compute/cast/primitive_to.rs index 583b6ab19a96..d0d2056b70de 100644 --- a/crates/polars-arrow/src/compute/cast/primitive_to.rs +++ b/crates/polars-arrow/src/compute/cast/primitive_to.rs @@ -318,7 +318,6 @@ pub fn primitive_to_dictionary( let mut array = MutableDictionaryArray::::try_empty(MutablePrimitiveArray::::from( from.data_type().clone(), ))?; - array.reserve(from.len()); array.try_extend(iter)?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/utf8_to.rs b/crates/polars-arrow/src/compute/cast/utf8_to.rs index 85b478c43817..4df2876d394e 100644 --- a/crates/polars-arrow/src/compute/cast/utf8_to.rs +++ b/crates/polars-arrow/src/compute/cast/utf8_to.rs @@ -27,7 +27,6 @@ pub fn utf8_to_dictionary( from: &Utf8Array, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); - array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 620ac11c3351..2408d66e9ba2 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -102,7 +102,7 @@ where WriteOptions { write_statistics: self.statistics, compression: self.compression, - version: Version::V1, + version: Version::V2, data_pagesize_limit: self.data_page_size, } } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 0525578589eb..b3ea666865c9 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -1,6 +1,7 @@ use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray}; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, IntegerType}; +use num_traits::ToPrimitive; use polars_error::{polars_bail, PolarsResult}; use super::binary::{ @@ -15,19 +16,23 @@ use super::primitive::{ use super::{binview, nested, Nested, WriteOptions}; use crate::arrow::read::schema::is_nullable; use crate::arrow::write::{slice_nested_leaf, utils}; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_u32; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{serialize_statistics, ParquetStatistics}; -use crate::write::DynIter; +use crate::write::{to_nested, DynIter, ParquetType}; pub(crate) fn encode_as_dictionary_optional( array: &dyn Array, - nested: &[Nested], type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { + let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone())) + .ok()? + .pop() + .unwrap(); + let dtype = Box::new(array.data_type().clone()); let len_before = array.len(); @@ -47,11 +52,35 @@ pub(crate) fn encode_as_dictionary_optional( if (array.values().len() as f64) / (len_before as f64) > 0.75 { return None; } + if array.values().len().to_u16().is_some() { + let array = arrow::compute::cast::cast( + array, + &ArrowDataType::Dictionary( + IntegerType::UInt16, + Box::new(array.values().data_type().clone()), + false, + ), + Default::default(), + ) + .unwrap(); + + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + return Some(array_to_pages( + array, + type_, + &nested, + options, + Encoding::RleDictionary, + )); + } Some(array_to_pages( array, type_, - nested, + &nested, options, Encoding::RleDictionary, )) @@ -87,7 +116,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode::(buffer, keys, num_bits)?) + Ok(encode_u32(buffer, keys, num_bits)?) } else { let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); @@ -95,7 +124,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode::(buffer, keys, num_bits)?) + Ok(encode_u32(buffer, keys, num_bits)?) } } diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 65e03cecaae4..a980177c4835 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -219,7 +219,7 @@ pub fn array_to_pages( // Only take this path for primitive columns if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) { if let Some(result) = - encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options) + encode_as_dictionary_optional(primitive_array, type_.clone(), options) { return result; } diff --git a/crates/polars-parquet/src/arrow/write/nested/mod.rs b/crates/polars-parquet/src/arrow/write/nested/mod.rs index 9aed392a06ee..46e15eec6c72 100644 --- a/crates/polars-parquet/src/arrow/write/nested/mod.rs +++ b/crates/polars-parquet/src/arrow/write/nested/mod.rs @@ -6,7 +6,7 @@ use polars_error::PolarsResult; pub use rep::num_values; use super::Nested; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_u32; use crate::parquet::read::levels::get_bit_width; use crate::parquet::write::Version; @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => { write_levels_v1(buffer, |buffer: &mut Vec| { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; Ok(()) })?; }, Version::V2 => { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; }, } @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { - encode::(buffer, levels, num_bits)?; + encode_u32(buffer, levels, num_bits)?; Ok(()) }), - Version::V2 => Ok(encode::(buffer, levels, num_bits)?), + Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), } } diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 0ba9f4289bab..2032029b2de4 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -4,7 +4,7 @@ use polars_error::*; use super::{Version, WriteOptions}; use crate::parquet::compression::CompressionOptions; -use crate::parquet::encoding::hybrid_rle::encode; +use crate::parquet::encoding::hybrid_rle::encode_bool; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics; fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> PolarsResult<()> { buffer.extend_from_slice(&[0; 4]); let start = buffer.len(); - encode::(buffer, iter, 1)?; + encode_bool(buffer, iter)?; let end = buffer.len(); let length = end - start; @@ -25,7 +25,7 @@ fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> Po } fn encode_iter_v2>(writer: &mut Vec, iter: I) -> PolarsResult<()> { - Ok(encode::(writer, iter, 1)?) + Ok(encode_bool(writer, iter)?) } fn encode_iter>( diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs index 963499cf324f..1c4dd67ccec7 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs @@ -3,216 +3,98 @@ use std::io::Write; use super::bitpacked_encode; use crate::parquet::encoding::{bitpacked, ceil8, uleb128}; -// Arbitrary value that balances memory usage and storage overhead -const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8; - -pub trait Encoder { - fn bitpacked_encode>( - writer: &mut W, - iterator: I, - num_bits: usize, - ) -> std::io::Result<()>; - - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: T, - bit_width: u32, - ) -> std::io::Result<()>; -} +/// RLE-hybrid encoding of `u32`. This currently only yields bitpacked values. +pub fn encode_u32>( + writer: &mut W, + iterator: I, + num_bits: u32, +) -> std::io::Result<()> { + let num_bits = num_bits as u8; + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); -const U32_BLOCK_LEN: usize = 32; + // write the length + indicator + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; -impl Encoder for u32 { - fn bitpacked_encode>( - writer: &mut W, - mut iterator: I, - num_bits: usize, - ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - - let chunks = length / U32_BLOCK_LEN; - let remainder = length - chunks * U32_BLOCK_LEN; - let mut buffer = [0u32; U32_BLOCK_LEN]; - - // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 - let compressed_chunk_size = 4 * num_bits; - - for _ in 0..chunks { - iterator - .by_ref() - .take(U32_BLOCK_LEN) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_chunk_size])?; - } - - if remainder != 0 { - // Must be careful here to ensure we write a multiple of `num_bits` - // (the bit width) to align with the spec. Some readers also rely on - // this - see https://github.com/pola-rs/polars/pull/13883. - - // this is ceil8(remainder * num_bits), but we ensure the output is a - // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits - let compressed_remainder_size = ceil8(remainder) * num_bits; - iterator - .by_ref() - .take(remainder) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack(&buffer[..remainder], num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_remainder_size])?; - }; - Ok(()) - } + bitpacked_encode_u32(writer, iterator, num_bits as usize)?; - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: u32, - bit_width: u32, - ) -> std::io::Result<()> { - // write the length + indicator - let mut header = run_length as u64; - header <<= 1; - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - - let num_bytes = ceil8(bit_width as usize); - let bytes = value.to_le_bytes(); - writer.write_all(&bytes[..num_bytes])?; - Ok(()) - } + Ok(()) } -impl Encoder for bool { - fn bitpacked_encode>( - writer: &mut W, - iterator: I, - _num_bits: usize, - ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - bitpacked_encode(writer, iterator)?; - Ok(()) - } +const U32_BLOCK_LEN: usize = 32; - fn run_length_encode( - writer: &mut W, - run_length: usize, - value: bool, - _bit_width: u32, - ) -> std::io::Result<()> { - // write the length + indicator - let mut header = run_length as u64; - header <<= 1; - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; - writer.write_all(&(value as u8).to_le_bytes())?; - Ok(()) +fn bitpacked_encode_u32>( + writer: &mut W, + mut iterator: I, + num_bits: usize, +) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let chunks = length / U32_BLOCK_LEN; + let remainder = length - chunks * U32_BLOCK_LEN; + let mut buffer = [0u32; U32_BLOCK_LEN]; + + // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 + let compressed_chunk_size = 4 * num_bits; + + for _ in 0..chunks { + iterator + .by_ref() + .take(U32_BLOCK_LEN) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_chunk_size])?; } + + if remainder != 0 { + // Must be careful here to ensure we write a multiple of `num_bits` + // (the bit width) to align with the spec. Some readers also rely on + // this - see https://github.com/pola-rs/polars/pull/13883. + + // this is ceil8(remainder * num_bits), but we ensure the output is a + // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits + let compressed_remainder_size = ceil8(remainder) * num_bits; + iterator + .by_ref() + .take(remainder) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_remainder_size])?; + }; + Ok(()) } -#[allow(clippy::comparison_chain)] -pub fn encode, W: Write, I: Iterator>( +/// the bitpacked part of the encoder. +pub fn encode_bool>( writer: &mut W, iterator: I, - num_bits: u32, ) -> std::io::Result<()> { - let mut consecutive_repeats: usize = 0; - let mut previous_val = T::default(); - let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN]; - let mut buffer_idx = 0; - let mut literal_run_idx = 0; - for val in iterator { - if val == previous_val { - consecutive_repeats += 1; - // Run is long enough to RLE, no need to buffer values - if consecutive_repeats >= 8 { - // Run is long enough to RLE, no need to buffer values - if consecutive_repeats > 8 { - continue; - } else { - // Ensure literal run has multiple of 8 values - // Take from consecutive repeats if needed to pad up - let literal_padding = (8 - (literal_run_idx % 8)) % 8; - consecutive_repeats -= literal_padding; - literal_run_idx += literal_padding; - } - } - // Too short to RLE, continue to buffer values - } else if consecutive_repeats > 8 { - // Flush literal run, if any, before RLE run - if literal_run_idx > 0 { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - literal_run_idx = 0; - } - T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; - consecutive_repeats = 1; - buffer_idx = 0; - } else { - // Not enough consecutive repeats to RLE, extend literal run - literal_run_idx = buffer_idx; - consecutive_repeats = 1; - } - // If buffer is full, bit-pack as literal run and reset - if buffer_idx == MAX_VALUES_PER_LITERAL_RUN { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - // Consecutive repeats may be consolidated into literal run - consecutive_repeats -= buffer_idx - literal_run_idx; - buffer_idx = 0; - literal_run_idx = 0; - } - buffered_bits[buffer_idx] = val; - previous_val = val; - buffer_idx += 1; - } - // Not enough consecutive repeats to RLE, extend literal run - if consecutive_repeats <= 8 { - literal_run_idx = buffer_idx; - consecutive_repeats = 0; - } - if literal_run_idx > 0 { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - } - if consecutive_repeats > 8 { - T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; - } - Ok(()) + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + // write the length + indicator + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + + writer.write_all(&container[..used])?; + + // encode the iterator + bitpacked_encode(writer, iterator) } #[cfg(test)] @@ -226,7 +108,7 @@ mod tests { let mut vec = vec![]; - encode::(&mut vec, iter, 1)?; + encode_bool(&mut vec, iter)?; assert_eq!(vec, vec![(2 << 1 | 1), 0b10011101u8, 0b00011101]); @@ -237,10 +119,9 @@ mod tests { fn bool_from_iter() -> std::io::Result<()> { let mut vec = vec![]; - encode::( + encode_bool( &mut vec, vec![true, true, true, true, true, true, true, true].into_iter(), - 1, )?; assert_eq!(vec, vec![(1 << 1 | 1), 0b11111111]); @@ -251,7 +132,7 @@ mod tests { fn test_encode_u32() -> std::io::Result<()> { let mut vec = vec![]; - encode::(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; + encode_u32(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; assert_eq!( vec, @@ -272,7 +153,7 @@ mod tests { let values = (0..128).map(|x| x % 4); - encode::(&mut vec, values, 2)?; + encode_u32(&mut vec, values, 2)?; let length = 128; let expected = 0b11_10_01_00u8; @@ -289,7 +170,7 @@ mod tests { let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter(); let mut vec = vec![]; - encode::(&mut vec, values, 2)?; + encode_u32(&mut vec, values, 2)?; let expected = vec![5, 207, 254, 247, 51]; assert_eq!(expected, vec); diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 89816f87fb54..3dc072552524 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -4,7 +4,7 @@ mod decoder; mod encoder; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; -pub use encoder::encode; +pub use encoder::{encode_bool, encode_u32}; use polars_utils::iter::FallibleIterator; use super::bitpacked; @@ -137,7 +137,7 @@ mod tests { let data = (0..1000).collect::>(); - encode::(&mut buffer, data.iter().cloned(), num_bits).unwrap(); + encode_u32(&mut buffer, data.iter().cloned(), num_bits).unwrap(); let decoder = HybridRleDecoder::try_new(&buffer, num_bits, data.len())?; diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index dd4e3a942c46..3112f115c3e7 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode; +use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -25,7 +25,7 @@ fn unzip_option(array: &[Option>]) -> Result<(Vec, Vec)> { false } }); - encode::(&mut validity, iter, 1)?; + encode_bool(&mut validity, iter)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index e5da32252e99..3b5ae150896a 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode; +use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -24,7 +24,7 @@ fn unzip_option(array: &[Option]) -> Result<(Vec, Vec) false } }); - encode::(&mut validity, iter, 1)?; + encode_bool(&mut validity, iter)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 846b4252e548..12ac1a835b40 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -892,46 +892,3 @@ def test_no_glob_windows(tmp_path: Path) -> None: df.write_parquet(str(p2)) assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) - - -@pytest.mark.slow() -def test_hybrid_rle() -> None: - df = pl.DataFrame( - { - # Test primitive types - "i64": pl.repeat(int(2**63 - 1), n=10000, dtype=pl.Int64, eager=True), - "u64": pl.repeat(int(2**64 - 1), n=10000, dtype=pl.UInt64, eager=True), - "i8": pl.repeat(-int(2**7 - 1), n=10000, dtype=pl.Int8, eager=True), - "u8": pl.repeat(int(2**8 - 1), n=10000, dtype=pl.UInt8, eager=True), - "string": pl.repeat("a", n=10000, dtype=pl.String, eager=True), - "categorical": pl.Series((["a"] * 9 + ["b"]) * 1000, dtype=pl.Categorical), - # Test filling up bit-packing buffer - "large_bit_pack": ([0] * 5 + [1] * 5) * 1000, - # Test mix of bit-packed and RLE runs - "bit_pack_and_rle": ( - [0] + [1] * 19 + [2] * 8 + [3] * 12 + [4] * 5 + [5] * 5 - ) - * 200, - # Test some null values - "nulls_included": ( - [None] + [1] * 19 + [None] * 8 + [3] * 12 + [4] * 5 + [None] * 5 - ) - * 200, - # Test filling up bit-packing buffer for encode_bool, - # which is only used to encode validities - # Also checks that runs are handled correctly if buffer - # is flushed (at MAX_VALUES_PER_LITERAL_RUN values) - "large_bit_pack_validity": [0, None] * 4092 - + [0] * 9 - + [1] * 9 - + [2] * 10 - + [0] * 1788, - } - ) - f = io.BytesIO() - df.write_parquet(f) - f.seek(0) - for column in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]: - assert "RLE_DICTIONARY" in column["encodings"] - f.seek(0) - assert_frame_equal(pl.read_parquet(f), df)