Skip to content

Commit

Permalink
perf: Don't create small chunks in parallel collect. (#16845)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 10, 2024
1 parent c90869d commit 96ef044
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
14 changes: 7 additions & 7 deletions crates/polars-core/src/chunked_array/from_iterator_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ where
{
fn from_par_iter<I: IntoParallelIterator<Item = Option<T::Native>>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutablePrimitiveArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

impl FromParallelIterator<bool> for BooleanChunked {
fn from_par_iter<I: IntoParallelIterator<Item = bool>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

impl FromParallelIterator<Option<bool>> for BooleanChunked {
fn from_par_iter<I: IntoParallelIterator<Item = Option<bool>>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBooleanArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

Expand All @@ -106,7 +106,7 @@ where
{
fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

Expand All @@ -116,7 +116,7 @@ where
{
fn from_par_iter<I: IntoParallelIterator<Item = Ptr>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

Expand All @@ -126,7 +126,7 @@ where
{
fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

Expand All @@ -136,7 +136,7 @@ where
{
fn from_par_iter<I: IntoParallelIterator<Item = Option<Ptr>>>(iter: I) -> Self {
let chunks = collect_into_linked_list(iter, MutableBinaryViewArray::new);
Self::from_chunk_iter("", chunks)
Self::from_chunk_iter("", chunks).optional_rechunk()
}
}

Expand Down
15 changes: 14 additions & 1 deletion crates/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,19 @@ pub struct ChunkedArray<T: PolarsDataType> {
}

impl<T: PolarsDataType> ChunkedArray<T> {
fn should_rechunk(&self) -> bool {
self.chunks.len() > 1 && self.chunks.len() > self.len() / 3
}

fn optional_rechunk(self) -> Self {
// Rechunk if we have many small chunks.
if self.should_rechunk() {
self.rechunk()
} else {
self
}
}

/// Create a new [`ChunkedArray`] and compute its `length` and `null_count`.
///
/// If you want to explicitly the `length` and `null_count`, look at
Expand Down Expand Up @@ -181,7 +194,7 @@ impl<T: PolarsDataType> ChunkedArray<T> {

/// Get a reference to the used [`Metadata`]
///
/// This results a reference to an empty [`Metadata`] if its unset for this [`ChunkedArray`].
/// This results a reference to an empty [`Metadata`] if it's unset for this [`ChunkedArray`].
#[inline(always)]
pub fn effective_metadata(&self) -> &Metadata<T> {
self.md.as_ref().map_or(&Metadata::DEFAULT, AsRef::as_ref)
Expand Down
15 changes: 7 additions & 8 deletions crates/polars-expr/src/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ fn set_by_groups(

macro_rules! dispatch {
($ca:expr) => {{
set_numeric($ca, groups, len)
Some(set_numeric($ca, groups, len))
}};
}
downcast_as_macro_arg_physical!(&s, dispatch).map(|s| s.cast(dtype).unwrap())
Expand All @@ -697,7 +697,7 @@ fn set_by_groups(
}
}

fn set_numeric<T>(ca: &ChunkedArray<T>, groups: &GroupsProxy, len: usize) -> Option<Series>
fn set_numeric<T>(ca: &ChunkedArray<T>, groups: &GroupsProxy, len: usize) -> Series
where
T: PolarsNumericType,
ChunkedArray<T>: IntoSeries,
Expand All @@ -709,10 +709,10 @@ where
let sync_ptr_values = unsafe { SyncPtr::new(ptr) };

if ca.null_count() == 0 {
let ca = ca.rechunk();
match groups {
GroupsProxy::Idx(groups) => {
// this should always succeed as we don't expect any chunks after an aggregation
let agg_vals = ca.cont_slice().ok()?;
let agg_vals = ca.cont_slice().expect("rechunked");
POOL.install(|| {
agg_vals
.par_iter()
Expand All @@ -727,8 +727,7 @@ where
})
},
GroupsProxy::Slice { groups, .. } => {
// this should always succeed as we don't expect any chunks after an aggregation
let agg_vals = ca.cont_slice().ok()?;
let agg_vals = ca.cont_slice().expect("rechunked");
POOL.install(|| {
agg_vals
.par_iter()
Expand All @@ -748,7 +747,7 @@ where

// SAFETY: we have written all slots
unsafe { values.set_len(len) }
Some(ChunkedArray::new_vec(ca.name(), values).into_series())
ChunkedArray::new_vec(ca.name(), values).into_series()
} else {
// We don't use a mutable bitmap as bits will have have race conditions!
// A single byte might alias if we write from single threads.
Expand Down Expand Up @@ -826,6 +825,6 @@ where
values.into(),
Some(validity),
);
Some(Series::try_from((ca.name(), arr.boxed())).unwrap())
Series::try_from((ca.name(), arr.boxed())).unwrap()
}
}
1 change: 1 addition & 0 deletions crates/polars/tests/it/lazy/functions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;

#[test]
#[cfg(all(feature = "concat_str", feature = "strings"))]
fn test_format_str() {
let a = df![
"a" => [1, 2],
Expand Down

0 comments on commit 96ef044

Please sign in to comment.