Skip to content

Commit

Permalink
add reset to ExecutionPlan.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackson Newhouse committed Mar 5, 2024
1 parent bf6f83b commit da895f3
Show file tree
Hide file tree
Showing 49 changed files with 283 additions and 3 deletions.
4 changes: 4 additions & 0 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,8 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

pub struct ArrowOpener {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

#[cfg(feature = "avro")]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A Config for [`CsvOpener`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,10 @@ pub(crate) mod tests {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

pub(crate) fn schema() -> SchemaRef {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ impl ExecutionPlan for OutputRequirementExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

impl PhysicalOptimizerRule for OutputRequirements {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,10 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

// Produces an execution plan where the schema is mismatched from
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ impl ExecutionPlan for StatisticsExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

pub mod object_store;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ impl ExecutionPlan for UnboundedExec {
batch: self.batch.clone(),
}))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ impl ExecutionPlan for CustomExecutionPlan {
.collect(),
})
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ impl ExecutionPlan for CustomPlan {
// but we want to test the filter pushdown not the CBOs
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Clone)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ExecutionPlan for StatisticsValidation {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ impl DisplayAs for TopKExec {
}
}

#[async_trait]
impl ExecutionPlan for TopKExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -494,6 +493,10 @@ impl ExecutionPlan for TopKExec {
// better statistics inference could be provided
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

// A very specialized TopK implementation
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ impl ExecutionPlan for AggregateExec {
}
}
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

fn create_schema(
Expand Down Expand Up @@ -1695,6 +1700,10 @@ mod tests {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// A stream using the demo data. If inited as new, it will first yield to runtime before returning records
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

/// Creates the ouput of AnalyzeExec as a RecordBatch
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

struct CoalesceBatchesStream {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ mod tests {
)),
}
}

fn reset(&self) -> datafusion_common::Result<()> {
Ok(())
}
}

fn test_stats_display(exec: TestStatsExecPlan, show_stats: bool) {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl ExecutionPlan for EmptyExec {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl ExecutionPlan for ExplainExec {
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// If this plan should be shown, given the previous plan that was
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ impl ExecutionPlan for FilterExec {
column_statistics,
})
}

fn reset(&self) -> Result<()> {
todo!()
}
}

/// This function ensures that all bounds in the `ExprBoundaries` vector are
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ impl ExecutionPlan for FileSinkExec {
stream,
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// Create a output record batch with a count
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ impl ExecutionPlan for CrossJoinExec {
self.right.statistics()?,
))
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.left_fut.reset()?;
self.left.reset()?;
self.right.reset()
}
}

/// [left/right]_col_count are required in case the column statistics are None
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ impl ExecutionPlan for HashJoinExec {
&self.schema,
)
}

fn reset(&self) -> Result<()> {
self.left_fut.reset()?;
self.metrics.reset();
self.left.reset()?;
self.right.reset()?;
Ok(())
}
}

/// Reads the left (build) side of the input, buffering it in memory, to build a
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ impl ExecutionPlan for NestedLoopJoinExec {
&self.schema,
)
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.inner_table.reset()?;
self.left.reset()?;
self.right.reset()
}
}

// For the nested loop join, different join type need the different distribution for
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ impl ExecutionPlan for SortMergeJoinExec {
&self.schema,
)
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.left.reset()?;
self.right.reset()
}
}

/// Metrics for SortMergeJoinExec
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
reservation,
}))
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.left.reset()?;
self.right.reset()
}
}

/// A stream that issues [RecordBatch]es as they arrive from the right of the join.
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,16 @@ impl<T: 'static> OnceAsync<T> {
.get_or_insert_with(|| OnceFut::new(f()))
.clone()
}

pub(crate) fn reset(&self) -> Result<()> {
let fut = self.fut.lock();
if fut.is_none() {
return Ok(());
}
Err(DataFusionError::Internal(
"OnceAsync::reset can't be called once it started running".to_string(),
))
}
}

/// The shared future type used internally within [`OnceAsync`]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()>;
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
Expand Down
Loading

0 comments on commit da895f3

Please sign in to comment.