diff --git a/src/EtlExecutor.php b/src/EtlExecutor.php index f73fa36..4ed64c5 100644 --- a/src/EtlExecutor.php +++ b/src/EtlExecutor.php @@ -123,8 +123,8 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void private function consumeNextTick(EtlState $state): void { foreach ($state->nextTickCallbacks as $callback) { - ($callback)($state); $state->nextTickCallbacks->detach($callback); + ($callback)($state); } } @@ -212,10 +212,14 @@ private function flush(EtlState $state, bool $early): mixed */ private function terminate(EtlState $state): EtlState { - try { - $this->consumeNextTick($state); - } catch (SkipRequest|StopRequest) { + // Ensure everything has been cleared + while (0 !== count($state->nextTickCallbacks)) { + try { + $this->consumeNextTick($state); + } catch (SkipRequest|StopRequest) { + } } + $output = $this->flush($state->getLastVersion(), false); $state = $state->getLastVersion();