Skip to content

Commit

Permalink
refactor(rust): Remove unneeded code (#16838)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 9, 2024
1 parent 5553c3e commit e74a63d
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 183 deletions.
37 changes: 3 additions & 34 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use rayon::prelude::*;
#[cfg(feature = "algorithm_group_by")]
use crate::chunked_array::ops::unique::is_unique_helper;
use crate::prelude::*;
use crate::utils::{slice_offsets, split_ca, split_df, try_get_supertype, NoNull};
#[cfg(feature = "row_hash")]
use crate::utils::split_df;
use crate::utils::{slice_offsets, try_get_supertype, NoNull};

#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
Expand Down Expand Up @@ -1623,36 +1625,6 @@ impl DataFrame {
opt_idx.and_then(|idx| self.select_at_idx_mut(idx))
}

/// Does a filter but splits thread chunks vertically instead of horizontally
/// This yields a DataFrame with `n_chunks == n_threads`.
fn filter_vertical(&mut self, mask: &BooleanChunked) -> PolarsResult<Self> {
let n_threads = POOL.current_num_threads();

let masks = split_ca(mask, n_threads).unwrap();
let dfs = split_df(self, n_threads, false);
let dfs: PolarsResult<Vec<_>> = POOL.install(|| {
masks
.par_iter()
.zip(dfs)
.map(|(mask, df)| {
let cols = df
.columns
.iter()
.map(|s| s.filter(mask))
.collect::<PolarsResult<_>>()?;
Ok(unsafe { DataFrame::new_no_checks(cols) })
})
.collect()
});

let mut iter = dfs?.into_iter();
let first = iter.next().unwrap();
Ok(iter.fold(first, |mut acc, df| {
acc.vstack_mut(&df).unwrap();
acc
}))
}

/// Take the [`DataFrame`] rows by a boolean mask.
///
/// # Example
Expand All @@ -1665,9 +1637,6 @@ impl DataFrame {
/// }
/// ```
pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
if std::env::var("POLARS_VERT_PAR").is_ok() {
return self.clone().filter_vertical(mask);
}
let new_col = self.try_apply_columns_par(&|s| s.filter(mask))?;
Ok(unsafe { DataFrame::new_no_checks(new_col) })
}
Expand Down
97 changes: 1 addition & 96 deletions crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ use arrow::offset::Offsets;
pub use from::*;
pub use iterator::{SeriesIter, SeriesPhysIter};
use num_traits::NumCast;
use rayon::prelude::*;
pub use series_trait::{IsSorted, *};

use crate::chunked_array::cast::CastOptions;
use crate::chunked_array::metadata::{Metadata, MetadataFlags};
#[cfg(feature = "zip_with")]
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::{
_split_offsets, handle_casting_failures, materialize_dyn_int, split_ca, split_series, Wrap,
};
use crate::utils::{handle_casting_failures, materialize_dyn_int, Wrap};
use crate::POOL;

/// # Series
Expand Down Expand Up @@ -618,41 +615,6 @@ impl Series {
}
}

fn finish_take_threaded(&self, s: Vec<Series>, rechunk: bool) -> Series {
let s = s
.into_iter()
.reduce(|mut s, s1| {
s.append(&s1).unwrap();
s
})
.unwrap();
if rechunk {
s.rechunk()
} else {
s
}
}

// Take a function pointer to reduce bloat.
fn threaded_op(
&self,
rechunk: bool,
len: usize,
func: &(dyn Fn(usize, usize) -> PolarsResult<Series> + Send + Sync),
) -> PolarsResult<Series> {
let n_threads = POOL.current_num_threads();
let offsets = _split_offsets(len, n_threads);

let series: PolarsResult<Vec<_>> = POOL.install(|| {
offsets
.into_par_iter()
.map(|(offset, len)| func(offset, len))
.collect()
});

Ok(self.finish_take_threaded(series?, rechunk))
}

/// Take by index if ChunkedArray contains a single chunk.
///
/// # Safety
Expand All @@ -661,40 +623,6 @@ impl Series {
self.take_slice_unchecked(idx)
}

/// Take by index if ChunkedArray contains a single chunk.
///
/// # Safety
/// This doesn't check any bounds. Null validity is checked.
pub unsafe fn take_unchecked_threaded(&self, idx: &IdxCa, rechunk: bool) -> Series {
self.threaded_op(rechunk, idx.len(), &|offset, len| {
let idx = idx.slice(offset as i64, len);
Ok(self.take_unchecked(&idx))
})
.unwrap()
}

/// Take by index if ChunkedArray contains a single chunk.
///
/// # Safety
/// This doesn't check any bounds. Null validity is checked.
pub unsafe fn take_slice_unchecked_threaded(&self, idx: &[IdxSize], rechunk: bool) -> Series {
self.threaded_op(rechunk, idx.len(), &|offset, len| {
Ok(self.take_slice_unchecked(&idx[offset..offset + len]))
})
.unwrap()
}

/// Take by index. This operation is clone.
///
/// # Notes
/// Out of bounds access doesn't Error but will return a Null value
pub fn take_threaded(&self, idx: &IdxCa, rechunk: bool) -> PolarsResult<Series> {
self.threaded_op(rechunk, idx.len(), &|offset, len| {
let idx = idx.slice(offset as i64, len);
self.take(&idx)
})
}

/// Traverse and collect every nth element in a new array.
pub fn gather_every(&self, n: usize, offset: usize) -> Series {
let idx = ((offset as IdxSize)..self.len() as IdxSize)
Expand All @@ -704,29 +632,6 @@ impl Series {
unsafe { self.take_unchecked(&idx) }
}

/// Filter by boolean mask. This operation clones data.
pub fn filter_threaded(&self, filter: &BooleanChunked, rechunk: bool) -> PolarsResult<Series> {
// This would fail if there is a broadcasting filter, because we cannot
// split that filter over threads besides they are a no-op, so we do the
// standard filter.
if filter.len() == 1 {
return self.filter(filter);
}
let n_threads = POOL.current_num_threads();
let filters = split_ca(filter, n_threads).unwrap();
let series = split_series(self, n_threads).unwrap();

let series: PolarsResult<Vec<_>> = POOL.install(|| {
filters
.par_iter()
.zip(series)
.map(|(filter, s)| s.filter(filter))
.collect()
});

Ok(self.finish_take_threaded(series?, rechunk))
}

#[cfg(feature = "dot_product")]
pub fn dot(&self, other: &Series) -> PolarsResult<f64> {
(self * other).sum::<f64>()
Expand Down
37 changes: 1 addition & 36 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,6 @@ pub(crate) fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
}
}

macro_rules! split_array {
($ca: expr, $n: expr, $ty : ty) => {{
if $n == 1 {
return Ok(vec![$ca.clone()]);
}
let total_len = $ca.len();
let chunk_size = total_len / $n;

let v = (0..$n)
.map(|i| {
let offset = i * chunk_size;
let len = if i == ($n - 1) {
total_len - offset
} else {
chunk_size
};
$ca.slice((i * chunk_size) as $ty, len)
})
.collect();
Ok(v)
}};
}

// This one splits, but doesn't flatten chunks;
pub fn split_ca<T>(ca: &ChunkedArray<T>, n: usize) -> PolarsResult<Vec<ChunkedArray<T>>>
where
T: PolarsDataType,
{
split_array!(ca, n, i64)
}

// prefer this one over split_ca, as this can push the null_count into the thread pool
// returns an `(offset, length)` tuple
#[doc(hidden)]
Expand All @@ -135,11 +104,6 @@ pub fn _split_offsets(len: usize, n: usize) -> Vec<(usize, usize)> {
}
}

#[doc(hidden)]
pub fn split_series(s: &Series, n: usize) -> PolarsResult<Vec<Series>> {
split_array!(s, n, i64)
}

#[allow(clippy::len_without_is_empty)]
pub trait Container: Clone {
fn slice(&self, offset: i64, len: usize) -> Self;
Expand Down Expand Up @@ -237,6 +201,7 @@ fn split_impl<C: Container>(container: &C, target: usize, chunk_size: usize) ->
out
}

/// Splits, but doesn't flatten chunks. E.g. a container can still have multiple chunks.
pub fn split<C: Container>(container: &C, target: usize) -> Vec<C> {
let total_len = container.len();
if total_len == 0 {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/utils/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ where

pub fn handle_casting_failures(input: &Series, output: &Series) -> PolarsResult<()> {
let failure_mask = !input.is_null() & output.is_null();
let failures = input.filter_threaded(&failure_mask, false)?;
let failures = input.filter(&failure_mask)?;

let additional_info = match (input.dtype(), output.dtype()) {
(DataType::String, DataType::Date | DataType::Datetime(_, _)) => {
Expand Down
8 changes: 1 addition & 7 deletions crates/polars-ops/src/chunked_array/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,6 @@ fn top_k_by_impl(

let idx = _arg_bottom_k(k, by, &mut sort_options)?;

let result = unsafe {
if multithreaded {
src.take_unchecked_threaded(&idx.into_inner(), false)
} else {
src.take_unchecked(&idx.into_inner())
}
};
let result = unsafe { src.take_unchecked(&idx.into_inner()) };
Ok(result)
}
2 changes: 1 addition & 1 deletion crates/polars-ops/src/frame/join/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod single_keys_outer;
mod single_keys_semi_anti;
pub(super) mod sort_merge;
use arrow::array::ArrayRef;
use polars_core::utils::{_set_partition_size, split_ca};
use polars_core::utils::_set_partition_size;
use polars_core::POOL;
use polars_utils::index::ChunkId;
pub(super) use single_keys::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow::array::PrimitiveArray;
use polars_core::utils::split;
use polars_core::with_match_physical_float_polars_type;
use polars_utils::hashing::DirtyHash;
use polars_utils::nulls::IsNull;
Expand Down Expand Up @@ -261,8 +262,8 @@ where
{
let n_threads = POOL.current_num_threads();
let (a, b, swapped) = det_hash_prone_order!(left, right);
let splitted_a = split_ca(a, n_threads).unwrap();
let splitted_b = split_ca(b, n_threads).unwrap();
let splitted_a = split(a, n_threads);
let splitted_b = split(b, n_threads);
let splitted_a = get_arrays(&splitted_a);
let splitted_b = get_arrays(&splitted_b);

Expand Down Expand Up @@ -346,8 +347,8 @@ where
<Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash,
{
let n_threads = POOL.current_num_threads();
let splitted_a = split_ca(left, n_threads).unwrap();
let splitted_b = split_ca(right, n_threads).unwrap();
let splitted_a = split(left, n_threads);
let splitted_b = split(right, n_threads);
match (
left.null_count(),
right.null_count(),
Expand Down Expand Up @@ -405,8 +406,8 @@ where
let (a, b, swapped) = det_hash_prone_order!(ca_in, other);

let n_partitions = _set_partition_size();
let splitted_a = split_ca(a, n_partitions).unwrap();
let splitted_b = split_ca(b, n_partitions).unwrap();
let splitted_a = split(a, n_partitions);
let splitted_b = split(b, n_partitions);

match (a.null_count(), b.null_count()) {
(0, 0) => {
Expand Down Expand Up @@ -496,8 +497,8 @@ where
<Option<T::Native> as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull,
{
let n_threads = POOL.current_num_threads();
let splitted_a = split_ca(left, n_threads).unwrap();
let splitted_b = split_ca(right, n_threads).unwrap();
let splitted_a = split(left, n_threads);
let splitted_b = split(right, n_threads);
match (
left.null_count(),
right.null_count(),
Expand Down

0 comments on commit e74a63d

Please sign in to comment.