diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index d0860f3faccd..acfa4b77d8f2 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -153,6 +153,13 @@ impl LazyFrame { self } + /// Check if operations are order dependent and unset maintaining_order if + /// the order would not be observed. + pub fn with_check_order(mut self, toggle: bool) -> Self { + self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle); + self + } + /// Toggle predicate pushdown optimization. pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self { self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle); diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index e3943ec03d17..4450b956a1c2 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -169,11 +169,7 @@ pub(crate) fn insert_streaming_nodes( state.operators_sinks.push(PipelineNode::Operator(root)); stack.push(StackFrame::new(*input, state, current_idx)) }, - HStack { input, exprs, .. } - if exprs - .iter() - .all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) => - { + HStack { input, exprs, .. } if all_elementwise(exprs, expr_arena) => { state.streamable = true; state.operators_sinks.push(PipelineNode::Operator(root)); stack.push(StackFrame::new(*input, state, current_idx)) @@ -198,11 +194,7 @@ pub(crate) fn insert_streaming_nodes( state.operators_sinks.push(PipelineNode::Sink(root)); stack.push(StackFrame::new(*input, state, current_idx)) }, - Select { input, expr, .. } - if expr - .iter() - .all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) => - { + Select { input, expr, .. } if all_elementwise(expr, expr_arena) => { state.streamable = true; state.operators_sinks.push(PipelineNode::Operator(root)); stack.push(StackFrame::new(*input, state, current_idx)) diff --git a/crates/polars-plan/src/frame/opt_state.rs b/crates/polars-plan/src/frame/opt_state.rs index 699b4a09a0ce..04586e774a04 100644 --- a/crates/polars-plan/src/frame/opt_state.rs +++ b/crates/polars-plan/src/frame/opt_state.rs @@ -35,6 +35,9 @@ bitflags! { const FAST_PROJECTION = 1 << 14; /// Collapse slower joins with filters into faster joins. const COLLAPSE_JOINS = 1 << 15; + /// Check if operations are order dependent and unset maintaining_order if + /// the order would not be observed. + const CHECK_ORDER_OBSERVE = 1 << 16; } } diff --git a/crates/polars-plan/src/plans/aexpr/utils.rs b/crates/polars-plan/src/plans/aexpr/utils.rs index 834aa4ff6a75..c26af7cb0d3b 100644 --- a/crates/polars-plan/src/plans/aexpr/utils.rs +++ b/crates/polars-plan/src/plans/aexpr/utils.rs @@ -39,6 +39,15 @@ pub fn is_elementwise(stack: &mut UnitVec, ae: &AExpr, expr_arena: &Arena< true } +pub fn all_elementwise<'a, N>(nodes: &'a [N], expr_arena: &Arena) -> bool +where + Node: From<&'a N>, +{ + nodes + .iter() + .all(|n| is_elementwise_rec(expr_arena.get(n.into()), expr_arena)) +} + /// Recursive variant of `is_elementwise` pub fn is_elementwise_rec<'a>(mut ae: &'a AExpr, expr_arena: &'a Arena) -> bool { let mut stack = unitvec![]; diff --git a/crates/polars-plan/src/plans/conversion/join.rs b/crates/polars-plan/src/plans/conversion/join.rs index c233b69b6915..9aea4cd686fb 100644 --- a/crates/polars-plan/src/plans/conversion/join.rs +++ b/crates/polars-plan/src/plans/conversion/join.rs @@ -180,14 +180,9 @@ pub fn resolve_join( } // Every expression must be elementwise so that we are // guaranteed the keys for a join are all the same length. - let all_elementwise = |aexprs: &[ExprIR]| { - aexprs - .iter() - .all(|e| is_elementwise_rec(ctxt.expr_arena.get(e.node()), ctxt.expr_arena)) - }; polars_ensure!( - all_elementwise(&left_on) && all_elementwise(&right_on), + all_elementwise(&left_on, ctxt.expr_arena) && all_elementwise(&right_on, ctxt.expr_arena), InvalidOperation: "all join key expressions must be elementwise." ); diff --git a/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs b/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs index 266ac6cd3335..4bd0079a4827 100644 --- a/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs +++ b/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs @@ -123,29 +123,6 @@ impl OptimizationRule for SimpleProjectionAndCollapse { None } }, - // Remove double sorts - Sort { - input, - by_column, - slice, - sort_options: - sort_options @ SortMultipleOptions { - maintain_order: false, // `maintain_order=True` is influenced by result of earlier sorts - .. - }, - } => match lp_arena.get(*input) { - Sort { - input: inner, - slice: None, - .. - } => Some(Sort { - input: *inner, - by_column: by_column.clone(), - slice: *slice, - sort_options: sort_options.clone(), - }), - _ => None, - }, _ => None, } } diff --git a/crates/polars-plan/src/plans/optimizer/collect_members.rs b/crates/polars-plan/src/plans/optimizer/collect_members.rs index f829b3362473..c24a948f1f05 100644 --- a/crates/polars-plan/src/plans/optimizer/collect_members.rs +++ b/crates/polars-plan/src/plans/optimizer/collect_members.rs @@ -27,6 +27,9 @@ pub(super) struct MemberCollector { pub(crate) has_cache: bool, pub(crate) has_ext_context: bool, pub(crate) has_filter_with_join_input: bool, + pub(crate) has_distinct: bool, + pub(crate) has_sort: bool, + pub(crate) has_group_by: bool, #[cfg(feature = "cse")] scans: UniqueScans, } @@ -38,6 +41,9 @@ impl MemberCollector { has_cache: false, has_ext_context: false, has_filter_with_join_input: false, + has_distinct: false, + has_sort: false, + has_group_by: false, #[cfg(feature = "cse")] scans: UniqueScans::default(), } @@ -50,6 +56,15 @@ impl MemberCollector { Filter { input, .. } => { self.has_filter_with_join_input |= matches!(lp_arena.get(*input), Join { options, .. } if options.args.how.is_cross()) }, + Distinct { .. } => { + self.has_distinct = true; + }, + GroupBy { .. } => { + self.has_group_by = true; + }, + Sort { .. } => { + self.has_sort = true; + }, Cache { .. } => self.has_cache = true, ExtContext { .. } => self.has_ext_context = true, #[cfg(feature = "cse")] diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 4b182b835e53..c712badfab45 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -18,6 +18,7 @@ mod fused; mod join_utils; mod predicate_pushdown; mod projection_pushdown; +mod set_order; mod simplify_expr; mod slice_pushdown_expr; mod slice_pushdown_lp; @@ -34,6 +35,7 @@ use slice_pushdown_lp::SlicePushDown; pub use stack_opt::{OptimizationRule, StackOptimizer}; use self::flatten_union::FlattenUnionRule; +use self::set_order::set_order_flags; pub use crate::frame::{AllowedOptimizations, OptFlags}; pub use crate::plans::conversion::type_coercion::TypeCoercionRule; use crate::plans::optimizer::count_star::CountStar; @@ -116,6 +118,13 @@ pub fn optimize( members.collect(lp_top, lp_arena, expr_arena) } + // Run before slice pushdown + if opt_state.contains(OptFlags::CHECK_ORDER_OBSERVE) + && members.has_group_by | members.has_sort | members.has_distinct + { + set_order_flags(lp_top, lp_arena, expr_arena, scratch); + } + if simplify_expr { #[cfg(feature = "fused")] rules.push(Box::new(fused::FusedArithmetic {})); diff --git a/crates/polars-plan/src/plans/optimizer/set_order.rs b/crates/polars-plan/src/plans/optimizer/set_order.rs new file mode 100644 index 000000000000..022529fbf9a9 --- /dev/null +++ b/crates/polars-plan/src/plans/optimizer/set_order.rs @@ -0,0 +1,156 @@ +use polars_utils::unitvec; + +use super::*; + +// Can give false positives. +fn is_order_dependent_top_level(ae: &AExpr, ctx: Context) -> bool { + match ae { + AExpr::Agg(agg) => match agg { + IRAggExpr::Min { .. } => false, + IRAggExpr::Max { .. } => false, + IRAggExpr::Median(_) => false, + IRAggExpr::NUnique(_) => false, + IRAggExpr::First(_) => true, + IRAggExpr::Last(_) => true, + IRAggExpr::Mean(_) => false, + IRAggExpr::Implode(_) => true, + IRAggExpr::Quantile { .. } => false, + IRAggExpr::Sum(_) => false, + IRAggExpr::Count(_, _) => false, + IRAggExpr::Std(_, _) => false, + IRAggExpr::Var(_, _) => false, + IRAggExpr::AggGroups(_) => true, + }, + AExpr::Column(_) => matches!(ctx, Context::Aggregation), + _ => true, + } +} + +// Can give false positives. +fn is_order_dependent<'a>(mut ae: &'a AExpr, expr_arena: &'a Arena, ctx: Context) -> bool { + let mut stack = unitvec![]; + + loop { + if is_order_dependent_top_level(ae, ctx) { + return true; + } + + let Some(node) = stack.pop() else { + break; + }; + + ae = expr_arena.get(node); + } + + false +} + +// Can give false negatives. +pub(crate) fn all_order_independent<'a, N>( + nodes: &'a [N], + expr_arena: &Arena, + ctx: Context, +) -> bool +where + Node: From<&'a N>, +{ + !nodes + .iter() + .any(|n| is_order_dependent(expr_arena.get(n.into()), expr_arena, ctx)) +} + +// Should run before slice pushdown. +pub(super) fn set_order_flags( + root: Node, + ir_arena: &mut Arena, + expr_arena: &Arena, + scratch: &mut Vec, +) { + scratch.clear(); + scratch.push(root); + + let mut maintain_order_above = true; + + while let Some(node) = scratch.pop() { + let ir = ir_arena.get_mut(node); + ir.copy_inputs(scratch); + + match ir { + IR::Sort { + input, + sort_options, + .. + } => { + debug_assert!(sort_options.limit.is_none()); + // This sort can be removed + if !maintain_order_above { + scratch.pop(); + scratch.push(node); + let input = *input; + ir_arena.swap(node, input); + continue; + } + + if !sort_options.maintain_order { + maintain_order_above = false; // `maintain_order=True` is influenced by result of earlier sorts + } + }, + IR::Distinct { options, .. } => { + debug_assert!(options.slice.is_none()); + if !maintain_order_above { + options.maintain_order = false; + continue; + } + if !options.maintain_order { + maintain_order_above = false; + } + }, + IR::Union { options, .. } => { + debug_assert!(options.slice.is_none()); + options.maintain_order = maintain_order_above; + }, + IR::GroupBy { + keys, + aggs, + maintain_order, + options, + apply, + .. + } => { + debug_assert!(options.slice.is_none()); + if !maintain_order_above && *maintain_order { + *maintain_order = false; + continue; + } + + if apply.is_some() + || *maintain_order + || options.is_rolling() + || options.is_dynamic() + { + maintain_order_above = true; + continue; + } + if all_elementwise(keys, expr_arena) + && all_order_independent(aggs, expr_arena, Context::Aggregation) + { + maintain_order_above = false; + continue; + } + maintain_order_above = true; + }, + // Conservative now. + IR::HStack { exprs, .. } | IR::Select { expr: exprs, .. } => { + if !maintain_order_above && all_elementwise(exprs, expr_arena) { + continue; + } + maintain_order_above = true; + }, + _ => { + // If we don't know maintain order + // Known: slice + maintain_order_above = true; + }, + } + } +} diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index a194a0f5434d..ab4a61639b48 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -46,12 +46,13 @@ pub struct FileScanOptions { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct UnionOptions { pub slice: Option<(i64, usize)>, - pub parallel: bool, // known row_output, estimated row output pub rows: (Option, usize), + pub parallel: bool, pub from_partitioned_ds: bool, pub flattened_by_opt: bool, pub rechunk: bool, + pub maintain_order: bool, } #[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)] @@ -71,6 +72,30 @@ pub struct GroupbyOptions { pub slice: Option<(i64, usize)>, } +impl GroupbyOptions { + pub(crate) fn is_rolling(&self) -> bool { + #[cfg(feature = "dynamic_group_by")] + { + self.rolling.is_some() + } + #[cfg(not(feature = "dynamic_group_by"))] + { + false + } + } + + pub(crate) fn is_dynamic(&self) -> bool { + #[cfg(feature = "dynamic_group_by")] + { + self.dynamic.is_some() + } + #[cfg(not(feature = "dynamic_group_by"))] + { + false + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct DistinctOptionsDSL { @@ -387,6 +412,7 @@ pub struct UnionArgs { pub diagonal: bool, // If it is a union from a scan over multiple files. pub from_partitioned_ds: bool, + pub maintain_order: bool, } impl Default for UnionArgs { @@ -397,6 +423,7 @@ impl Default for UnionArgs { to_supertypes: false, diagonal: false, from_partitioned_ds: false, + maintain_order: true, } } } @@ -410,6 +437,7 @@ impl From for UnionOptions { from_partitioned_ds: args.from_partitioned_ds, flattened_by_opt: false, rechunk: args.rechunk, + maintain_order: args.maintain_order, } } } diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index f0c53d0f340a..5e0ce0e2e0a4 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -494,6 +494,7 @@ impl PyLazyFrame { collapse_joins: bool, streaming: bool, _eager: bool, + _check_order: bool, #[allow(unused_variables)] new_streaming: bool, ) -> Self { let ldf = self.ldf.clone(); @@ -504,6 +505,7 @@ impl PyLazyFrame { .with_slice_pushdown(slice_pushdown) .with_cluster_with_columns(cluster_with_columns) .with_collapse_joins(collapse_joins) + .with_check_order(_check_order) ._with_eager(_eager) .with_projection_pushdown(projection_pushdown); diff --git a/py-polars/polars/functions/lazy.py b/py-polars/polars/functions/lazy.py index 759172e21bdd..7fadb8517aaa 100644 --- a/py-polars/polars/functions/lazy.py +++ b/py-polars/polars/functions/lazy.py @@ -1629,6 +1629,7 @@ def collect_all( cluster_with_columns: bool = True, collapse_joins: bool = True, streaming: bool = False, + _check_order: bool = True, ) -> list[DataFrame]: """ Collect multiple LazyFrames at the same time. @@ -1705,6 +1706,7 @@ def collect_all( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) prepared.append(ldf) @@ -1771,6 +1773,7 @@ def collect_all_async( cluster_with_columns: bool = True, collapse_joins: bool = True, streaming: bool = False, + _check_order: bool = True, ) -> Awaitable[list[DataFrame]] | _GeventDataFrameResult[list[DataFrame]]: """ Collect multiple LazyFrames at the same time asynchronously in thread pool. @@ -1870,6 +1873,7 @@ def collect_all_async( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) prepared.append(ldf) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 3f1125497c5c..fa9e10165f35 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -1023,6 +1023,7 @@ def explain( collapse_joins: bool = True, streaming: bool = False, tree_format: bool | None = None, + _check_order: bool = True, ) -> str: """ Create a string representation of the query plan. @@ -1107,6 +1108,7 @@ def explain( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) if format == "tree": @@ -1137,6 +1139,7 @@ def show_graph( cluster_with_columns: bool = True, collapse_joins: bool = True, streaming: bool = False, + _check_order: bool = True, ) -> str | None: """ Show a plot of the query plan. @@ -1202,6 +1205,7 @@ def show_graph( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) @@ -1611,6 +1615,7 @@ def profile( truncate_nodes: int = 0, figsize: tuple[int, int] = (18, 8), streaming: bool = False, + _check_order: bool = True, ) -> tuple[DataFrame, DataFrame]: """ Profile a LazyFrame. @@ -1706,6 +1711,7 @@ def profile( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) df, timings = ldf.profile() @@ -1768,6 +1774,7 @@ def collect( engine: EngineType = "cpu", background: Literal[True], _eager: bool = False, + _check_order: bool = True, ) -> InProcessQuery: ... @overload @@ -1787,6 +1794,7 @@ def collect( streaming: bool = False, engine: EngineType = "cpu", background: Literal[False] = False, + _check_order: bool = True, _eager: bool = False, ) -> DataFrame: ... @@ -1806,6 +1814,7 @@ def collect( streaming: bool = False, engine: EngineType = "cpu", background: bool = False, + _check_order: bool = True, _eager: bool = False, **_kwargs: Any, ) -> DataFrame | InProcessQuery: @@ -1974,6 +1983,7 @@ def collect( comm_subexpr_elim = False cluster_with_columns = False collapse_joins = False + _check_order = False if streaming: issue_unstable_warning("Streaming mode is considered unstable.") @@ -2005,6 +2015,7 @@ def collect( collapse_joins, streaming, _eager, + _check_order, new_streaming, ) @@ -2082,6 +2093,7 @@ def collect_async( cluster_with_columns: bool = True, collapse_joins: bool = True, streaming: bool = False, + _check_order: bool = True, ) -> Awaitable[DataFrame] | _GeventDataFrameResult[DataFrame]: """ Collect DataFrame asynchronously in thread pool. @@ -2203,6 +2215,7 @@ def collect_async( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) @@ -2858,11 +2871,13 @@ def _set_sink_optimizations( slice_pushdown: bool = True, collapse_joins: bool = True, no_optimization: bool = False, + _check_order: bool = True, ) -> PyLazyFrame: if no_optimization: predicate_pushdown = False projection_pushdown = False slice_pushdown = False + _check_order = False return self._ldf.optimization_toggle( type_coercion=type_coercion, @@ -2876,6 +2891,7 @@ def _set_sink_optimizations( collapse_joins=collapse_joins, streaming=True, _eager=False, + _check_order=_check_order, new_streaming=False, ) @@ -2950,6 +2966,7 @@ def _fetch( cluster_with_columns: bool = True, collapse_joins: bool = True, streaming: bool = False, + _check_order: bool = True, ) -> DataFrame: """ Collect a small number of rows for debugging purposes. @@ -3050,6 +3067,7 @@ def _fetch( collapse_joins, streaming, _eager=False, + _check_order=_check_order, new_streaming=False, ) return wrap_df(lf.fetch(n_rows)) diff --git a/py-polars/tests/unit/dataframe/test_df.py b/py-polars/tests/unit/dataframe/test_df.py index bfb58cdb1bd2..565fa8466e56 100644 --- a/py-polars/tests/unit/dataframe/test_df.py +++ b/py-polars/tests/unit/dataframe/test_df.py @@ -2038,7 +2038,6 @@ def test_add_string() -> None: expected = pl.DataFrame( {"a": ["hello hi", "hello there"], "b": ["hello hello", "hello world"]} ) - print(expected) assert_frame_equal(("hello " + df), expected) diff --git a/py-polars/tests/unit/lazyframe/optimizations.py b/py-polars/tests/unit/lazyframe/optimizations.py index 2417edecdeb8..969b2be15c8d 100644 --- a/py-polars/tests/unit/lazyframe/optimizations.py +++ b/py-polars/tests/unit/lazyframe/optimizations.py @@ -40,3 +40,13 @@ def test_fast_count_alias_18581() -> None: df = pl.scan_csv(f).select(pl.len().alias("weird_name")).collect() assert_frame_equal(pl.DataFrame({"weird_name": 2}), df) + + +def test_order_observability() -> None: + q = pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).sort("a") + + assert "SORT" not in q.group_by("a").sum().explain(_check_order=True) + assert "SORT" not in q.group_by("a").min().explain(_check_order=True) + assert "SORT" not in q.group_by("a").max().explain(_check_order=True) + assert "SORT" in q.group_by("a").last().explain(_check_order=True) + assert "SORT" in q.group_by("a").first().explain(_check_order=True) diff --git a/py-polars/tests/unit/operations/test_group_by.py b/py-polars/tests/unit/operations/test_group_by.py index 645b978214f0..924781a10bef 100644 --- a/py-polars/tests/unit/operations/test_group_by.py +++ b/py-polars/tests/unit/operations/test_group_by.py @@ -402,7 +402,7 @@ def test_group_by_sorted_empty_dataframe_3680() -> None: .sort("key") .group_by("key") .tail(1) - .collect() + .collect(_check_order=False) ) assert df.rows() == [] assert df.shape == (0, 2)