From fff179d9a67d0c350fcd8a596d3f42ea19764a71 Mon Sep 17 00:00:00 2001 From: Claudio Jolowicz Date: Fri, 23 Aug 2024 12:49:43 +0200 Subject: [PATCH] Fix allocation failures during sorting with spill-to-disk This change replaces `try_resize` with `resize` in three sites, allowing memory to overshoot the configured pool size. These are sites where we don't fall back to spilling to disk when the allocation fails. Fixes: #12136 --- datafusion/physical-plan/src/sorts/builder.rs | 3 ++- datafusion/physical-plan/src/sorts/sort.rs | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 3527d5738223..1ca5d477c8c2 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -68,7 +68,8 @@ impl BatchBuilder { /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + // Allow memory to exceed the limit + self.reservation.grow(batch.get_array_memory_size()); let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2d8237011fff..8b40fb1f3cc3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -421,7 +421,8 @@ impl ExternalSorter { // Reserve headroom for next sort/merge self.reserve_memory_for_merge()?; - self.reservation.try_resize(size)?; + // Allow memory to exceed limit temporarily before spill + self.reservation.resize(size); self.in_mem_batches_sorted = true; Ok(()) } @@ -559,7 +560,8 @@ impl ExternalSorter { if self.runtime.disk_manager.tmp_files_enabled() { let size = self.sort_spill_reservation_bytes; if self.merge_reservation.size() != size { - self.merge_reservation.try_resize(size)?; + // Allow memory to exceed the limit + self.merge_reservation.resize(size); } }