Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Jan 10, 2025
1 parent b69dbdd commit bc09fb1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
17 changes: 10 additions & 7 deletions crates/polars-stream/src/physical_plan/lower_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ pub fn is_input_independent_rec(
ret
}

pub fn is_input_independent(expr_key: ExprNodeKey, expr_arena: &Arena<AExpr>, cache: &mut ExprCache) -> bool {
is_input_independent_rec(
expr_key,
expr_arena,
&mut cache.is_input_independent,
)
pub fn is_input_independent(
expr_key: ExprNodeKey,
expr_arena: &Arena<AExpr>,
cache: &mut ExprCache,
) -> bool {
is_input_independent_rec(expr_key, expr_arena, &mut cache.is_input_independent)
}

fn is_input_independent_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {
Expand Down Expand Up @@ -687,7 +687,10 @@ fn build_select_stream_with_ctx(
exprs: &[ExprIR],
ctx: &mut LowerExprContext,
) -> PolarsResult<PhysStream> {
if exprs.iter().all(|e| is_input_independent_ctx(e.node(), ctx)) {
if exprs
.iter()
.all(|e| is_input_independent_ctx(e.node(), ctx))
{
return Ok(PhysStream::first(build_input_independent_node_with_ctx(
exprs, ctx,
)?));
Expand Down
44 changes: 28 additions & 16 deletions crates/polars-stream/src/physical_plan/lower_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use slotmap::SlotMap;

use super::lower_expr::{is_elementwise_rec_cached, lower_exprs};
use super::{ExprCache, PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};
use crate::physical_plan::lower_expr::{build_select_stream, compute_output_schema, is_input_independent, is_input_independent_rec, unique_column_name};
use crate::physical_plan::lower_expr::{
build_select_stream, compute_output_schema, is_input_independent, is_input_independent_rec,
unique_column_name,
};
use crate::utils::late_materialized_df::LateMaterializedDataFrame;

fn build_group_by_fallback(
Expand Down Expand Up @@ -117,13 +120,12 @@ fn try_lower_elementwise_scalar_agg_expr(
| AExpr::Sort { .. }
| AExpr::SortBy { .. }
| AExpr::Gather { .. } => None,

// Explode and filter are row-separable and should thus in theory work
// in a streaming fashion but they change the length of the input which
// means the same filter/explode should also be applied to the key
// column, which is not (yet) supported.
AExpr::Explode(_)
| AExpr::Filter { .. } => None,
AExpr::Explode(_) | AExpr::Filter { .. } => None,

AExpr::BinaryExpr { left, op, right } => {
let (left, op, right) = (*left, *op, *right);
Expand Down Expand Up @@ -197,7 +199,7 @@ fn try_lower_elementwise_scalar_agg_expr(
let trans_agg_node = expr_arena.add(AExpr::Agg(trans_agg));

// Add to aggregation expressions and replace with a reference to its output.

let agg_expr = if let Some(name) = outer_name {
ExprIR::new(trans_agg_node, OutputName::Alias(name))
} else {
Expand Down Expand Up @@ -251,13 +253,15 @@ fn try_build_streaming_group_by(
}

if keys.len() == 0 {
return Some(Err(polars_err!(ComputeError: "at least one key is required in a group_by operation")));
return Some(Err(
polars_err!(ComputeError: "at least one key is required in a group_by operation"),
));
}


let all_independent = keys.iter().chain(aggs.iter()).all(|expr|
is_input_independent(expr.node(), expr_arena, expr_cache)
);
let all_independent = keys
.iter()
.chain(aggs.iter())
.all(|expr| is_input_independent(expr.node(), expr_arena, expr_cache));
if all_independent {
return None;
}
Expand Down Expand Up @@ -295,10 +299,13 @@ fn try_build_streaming_group_by(
// substituting the translated input columns and extracting the aggregate
// expressions.
let mut trans_agg_exprs = Vec::new();
let mut trans_output_exprs = keys.iter().map(|key| {
let key_node = expr_arena.add(AExpr::Column(key.output_name().clone()));
ExprIR::from_node(key_node, expr_arena)
}).collect_vec();
let mut trans_output_exprs = keys
.iter()
.map(|key| {
let key_node = expr_arena.add(AExpr::Column(key.output_name().clone()));
ExprIR::from_node(key_node, expr_arena)
})
.collect_vec();
for agg in aggs {
let trans_node = try_lower_elementwise_scalar_agg_expr(
agg.node(),
Expand All @@ -311,9 +318,14 @@ fn try_build_streaming_group_by(
)?;
trans_output_exprs.push(ExprIR::new(trans_node, agg.output_name_inner().clone()));
}

let input_schema = &phys_sm[trans_input.node].output_schema;
let group_by_output_schema = compute_output_schema(input_schema, &[trans_keys.clone(), trans_agg_exprs.clone()].concat(), expr_arena).unwrap();
let group_by_output_schema = compute_output_schema(
input_schema,
&[trans_keys.clone(), trans_agg_exprs.clone()].concat(),
expr_arena,
)
.unwrap();
let agg_node = phys_sm.insert(PhysNode::new(
group_by_output_schema,
PhysNodeKind::GroupBy {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};
use crate::physical_plan::lower_expr::{
build_select_stream, is_elementwise_rec_cached, lower_exprs, ExprCache,
};
use crate::physical_plan::lower_group_by::{build_group_by_stream};
use crate::physical_plan::lower_group_by::build_group_by_stream;

/// Creates a new PhysStream which outputs a slice of the input stream.
fn build_slice_stream(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use polars_plan::prelude::expr_ir::ExprIR;

mod fmt;
mod lower_expr;
mod lower_ir;
mod lower_group_by;
mod lower_ir;
mod to_graph;

pub use fmt::visualize_plan;
Expand Down

0 comments on commit bc09fb1

Please sign in to comment.