Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throttle does not emit the latest element at interval occurrence when latest is true #266

Open
tarbaiev-smg opened this issue May 23, 2023 · 32 comments

Comments

@tarbaiev-smg
Copy link

tarbaiev-smg commented May 23, 2023

Unlike similar operator in Combine the throttle's latest parameter is confusing, as it does not emit any latest, but a next element instead.

When latest parameter is set to true

Actual behavior:

The throttle emits the next element from the base sequence, if any, after the interval occurs.
Meaning if there's no new elements emitted after the interval occurrence, the latest element is just skipped, leading to only the first element being emitted.

Expected behavior:

The throttle emits the latest cached element instantly when the interval occurs.

@tarbaiev-smg tarbaiev-smg changed the title throttle does not emit the latest element upon interval occurrence and latest is true throttle does not emit the latest element at interval occurrence and latest is true May 23, 2023
@tarbaiev-smg tarbaiev-smg changed the title throttle does not emit the latest element at interval occurrence and latest is true throttle does not emit the latest element at interval occurrence when latest is true May 23, 2023
@notcome
Copy link

notcome commented Aug 9, 2023

I think the semantics is very unclear on this one. Let's say we publish 0...99 every 100ms with a throttling window of 1 sec.

  • When latest = false we have 0, 1, 11, 21, ..., 81.
  • When latest = true we have 0, 10, 20, 30, ..., 90.

That feels very strange. I agree that at least there should be an option to produce the last element, if this element was initially skipped in the throttling window. But is there a clear semantics for this concept?

@tarbaiev-smg
Copy link
Author

tarbaiev-smg commented Aug 9, 2023

@notcome given the Combine and Swift Concurrency interoperability and switching from Publisher to AsyncSequence, I think AsyncAlgorithms's behavior should be consistent with Combine as much as possible.
I had to switch to the Combine's throttle because of this flaw. If I had an AsyncSequence instead of a Publisher, I would need to convert it to a Publisher, apply throttle and convert back to AsyncSequence, which is annoying and inefficient.

@notcome
Copy link

notcome commented Aug 10, 2023

@tarbaiev-smg So I did a little search on RxSwift, and it seems that something as simple as throttle actually has lots of subtle variations.

Anyway, it seems that adding a clock task, which is required to publish the latest element after signal stabilizing, is very tricky. The current throttle implementation has less than a hundred lines of code, whereas bounce spans near 2k LOC. Stunning…

I can't use Combine here because I am moving to concurrency infras for testing. So I end up focusing on my particular case — infinite demand, no error handling — and get away with a very simple actor. It's quite tricky though, Task.init captures your actor without warning (you don't need to write explicit self) and gets cancellation right is hard.

@Kolos65
Copy link

Kolos65 commented Aug 16, 2023

One would indeed expect to receive the last element when using throttle. A very common use case for example is to throttle a sequence of progress values where receiving the last element (i.e. 100%) is crucial.

In this example:

var numbers = Array((0...100).reversed())
let stream = AsyncStream {
    try! await Task.sleep(nanoseconds: 50_000_000)
    return numbers.popLast()
}

Task {
    let sequence = stream
        .throttle(for: .seconds(1))
    for try await num in sequence {
        print(num)
    }
}

The output is:

0
19
38
58
77
96

Which is unexpected as the latest element is 100.

The problem is that in the AsyncThrottleSequence implementation we can wait for the next element longer then interval which can result in the latest element being 'stuck'. Wouldn't it make sense to add a timeout when awaiting the base sequence's next() so that the latest element is always emitted after interval amount of time passed?

public mutating func next() async rethrows -> Reduced? {
    var reduced: Reduced?
    let start = last ?? clock.now
    repeat {
        // Return reduced if we wait longer then interval (and reduced was not returned previously)
        guard let element = try await base.next() else {
            return nil
        }
        let reduction = await reducing(reduced, element)
        let now = clock.now
        if start.duration(to: now) >= interval || last == nil {
            last = now
            return reduction
        } else {
            reduced = reduction
        }
    } while true
}

@kdubb
Copy link

kdubb commented Sep 3, 2023

I also find throttle's implementation confusing. In addition to the behavior of 'latest', given infinite demand, the loop consumes available processing. For example an AsyncSequence that is simply a counter returning immediately the next iteration's value.

Maybe I am expecting something I shouldn't but throttling by rapidly consuming a stream's elements, only to discard them, seems incorrect. Throttling should handle the case of over and under producing streams without a severe performance penalty in either case.

@kdubb
Copy link

kdubb commented Sep 3, 2023

Just as a use case, I attempted to use throttle to sample the audio level from an AVAudioRecorder; which immediately returns the current value. During a simple test I realized it was consuming values at an unchecked pace.

@FranzBusch
Copy link
Member

@kdubb I think that is the expected behaviour from throttle though. It will apply infinite demand as long as it has demand on itself. If you have a root asynchronous sequence that produces infinite values instantly then throttle will produce exactly one value at every interval.

I think you want another algorithm for what you describe here.

@kdubb
Copy link

kdubb commented Sep 6, 2023

The issue for me is that I was switching from a slow producing stream to an infinite producing stream and you need something that can deal with both cases. I attempted to use timer sequence and withLatest (from AsyncExtensnions) but that has the same issue with infinite streams (it loops in a similar way to throttle).

Finally, I flatMapd the timer sequence to the infinite stream to get a reasonable implementation but this all required knowing exactly the production timing of each stream.

To me the name throttle is expressly saying that it is reducing the production of the source stream; but currently it's not. If that just not how it's ever going to work then you are correct we need another algorithm.

Maybe we create one called 'polling' that uses a Clock to sleep until it acquires the next element from the source stream; and if calling next takes too long it immediately pulls. But if we have polling that works as I described, what use would there be for throttle?

@kdubb
Copy link

kdubb commented Sep 6, 2023

Also, adding a doc - Note: to this, and any other algorithms that will runaway with an infinitely supplying stream, would help users with their expectation and avoid unexpected performance issues when using these algorithms.

@FranzBusch
Copy link
Member

@phausler just put up a PR #292 to change the behaviour. We would appreciate if you all could check that it is doing what you expected it to do now.

@tarbaiev-smg
Copy link
Author

@FranzBusch Thanks for the update! However the PR does not seem to be addressing this issue. The AsyncThrottleSequence.next() is still awaiting the next element of its base sequence even after the interval ends. This means it won't emit an actual last element of the last interval, but will emit the first element of the subsequent interval instead.

@Kolos65
Copy link

Kolos65 commented Sep 18, 2023

Yes, while the PR addresses issues in cases where the sequence finishes (by returning nil), but as @tarbaiev-smg said, we can still await for the next element longer than interval which means that the last element wont be emitted.

Here is an updated example of the issue that uses a sequence that waits for the next element longer and is not finishing:

var numbers = Array((0...200).reversed())
let stream = AsyncStream<Int> {
   guard let number = numbers.popLast() else { return nil }
   if number == 101 {
        try! await Task.sleep(nanoseconds: 1_000_000_000_000)
    } else {
        try! await Task.sleep(nanoseconds: 50_000_000)
    }
    return number
}

Task {
    let sequence = stream
        .throttle(for: .seconds(1))
    for try await num in sequence {
        print(num)
    }
}

prints:

0
20
39
58
78
97
<long break>
<remaining numbers>

instead of:

0
20
39
58
78
97
100
<long break>
<remaining numbers>

@FranzBusch
Copy link
Member

@Kolos65 Thanks for the example. Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work. Validation tests look like this

    validate {
      "abcdefghijk|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: false)
      "a--b--e--h--[k|]"
    }

I am just trying to nail down if we have a problem and what it exactly is and that we are not mixing semantics of debounce with throttle here.

@pyrtsa
Copy link

pyrtsa commented Sep 20, 2023

Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work.

Let me help there. I believe this validation test should pass but it doesn't:

    validate {
       "ab--cd-e-f---gh|"
       $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
       "a--b--d--f---g--[h|]"
     }

@Kolos65
Copy link

Kolos65 commented Sep 20, 2023

@FranzBusch Here is one I think matches my example:

    validate {
      "-a-b-c-d-e-f-----h-i-j-k-|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
      "-a---c---e---f---h---j---|"
    }

actual: -a---c---e-------h---j---|

@pyrtsa
Copy link

pyrtsa commented Sep 20, 2023

I'm sorry @Kolos65, but I think there is a bug in that. Throttle should reset its timer for each output it yields, not input. If the timer lapses without further input, then the next input after that should be yielded immediately just like the initial input. Like so:

    validate {
      "-a-b-c-d-e-f-----h-i-j-k-|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
      "-a--b--d--e--f---h--i--k-|"
    }

Notice how there's an output every 3 steps for as long as there's input at least as often; otherwise the gap between outputs can be longer than 3 but no more than necessary.

@phausler
Copy link
Member

after looking into this pretty extensively; the outcome is this - I am currently in favor of removing throttle and deferring it out of the 1.0 since that is the only real remaining task at hand. And to be quite honest there is a lot of discordant expectations for what it ought to do.

I agree that the last element was definitely a bug, but the other issues stated have to do with how the interval is counted and for what context it is used. One confusing factor is that all AsyncSequences are applied as a demand of 1; in that each call to next is what drives the production of values. This comes in conflict when measuring time to the next value or the time to the previous value. Quite honestly; it doesn't seem like we have a good consensus on how that should operate.

The current implementation is rather straightforward but definitely exposes some behavioral aspects that might be surprising. But it definitely clashes easily on: "is this really a variation on a lower level thing that conjoins the debounce and throttle machinery?" or "how does this effect sendability?" or "does this secretly have a buffer under the hood?".

I welcome more clarity here; but the murkier it gets the more likely it seems that this operation (albeit currently rather simple) is not ready.

@FranzBusch
Copy link
Member

I agree with this. Let's remove throttle from 1.0.0 and brainstorm again exactly what semantics we want to implement here.

@tarbaiev-smg
Copy link
Author

A side topic, but I was somewhat surprised to see a 1.0.0 release already. There is no description or a changelog for it.
Just wondering, are there some breaking changes to justify the major increment or was it more of a milestone thing?
I'd personally like to see 1.0.0 when we already have feature parity with Combine (at least its first version as of iOS 13 SDK), and throttle is one thing which is missing currently and is very demanded by UI app developers.

@FranzBusch
Copy link
Member

We have been working on a 1.0.0 for quite some time now and are just going through the final open issues that are blocking the release. The goal for 1.0.0 is not feature parity or providing all possible algorithms but to have a baseline of algorithms that are working and functional.
There are many more algorithms that we want to support but the 1.0.0 is important for us to settle the foundation. Furthermore, it is important for the ecosystem since nobody can really depend on this package before the 1.0.0 is out.

@tarbaiev-smg
Copy link
Author

tarbaiev-smg commented Sep 21, 2023

One confusing factor is that all AsyncSequences are applied as a demand of 1; in that each call to next is what drives the production of values. This comes in conflict when measuring time to the next value or the time to the previous value. Quite honestly; it doesn't seem like we have a good consensus on how that should operate.

As I see it, based on Combine's throttle, the demand of 1 should be fine. We need to:

  1. await for the next value and the interval timeout in two parallel tasks
  2. when receiving a new value, just store it in a variable and override with new values if the latest is true
  3. when the interval ends, return the stored value or start over, if there's no value to return

Maybe we don't even need to measure time this way 🤔, I don't think we need to be very precise here fighting a race between the timeout and next value. I could try to craft a possible implementation.

@mipstian
Copy link

I too was/am expecting Combine's behavior!

If all I do with the base sequence is send two values immediately, I'd expect the throttled sequence to return the first immediately, and the second after one interval is elapsed. The implementation in 1.0.0 returns the first immediately and effectively drops the second.

@bobspryn
Copy link

Is there any update on throttle? Just was surprised by missing our last element as well when attempting to use it in a non-terminating stream.

@fanwgwg
Copy link

fanwgwg commented Jun 20, 2024

+1 Any updates on this issue so far?

@drewmccormack
Copy link

I think part of the problem is the overloading of terms like "throttle". I see it used to mean very different things, and the discussion above demonstrates it quite well. I think it would probably be best, given how loaded the term is, to not use it at all, coming up with new functions that are clearer.

To me, in the real world, throttling is about changing the rate of processing. In an engine, you can increase or decrease the revs by changing the throttle. What you never do is "drop" revs (unless your car has issues).

What I expected from "throttle" was that it would never drop anything in the sequence, but simply delay it, in order to maintain a maximum density of iteration. This apparently is not what it does. It seems that it indeed drops items.

What I would like to see are three different operators...

debounce - which waits for a pause in the stream before emitting the latest value, dropping anything in between.
spread - which never drops anything, but ensures that they are emitted with a maximum time density. It effectively queues things up, but slows down processing.
slice - which is currently throttle, but would need to produce the latest value at the end of the interval as discussed earlier

@tarbaiev-smg
Copy link
Author

@drewmccormack

debounce - which waits for a pause in the stream before emitting the latest value, dropping anything in between. spread - which never drops anything, but ensures that they are emitted with a maximum time density. It effectively queues things up, but slows down processing. slice - which is currently throttle, but would need to produce the latest value at the end of the interval as discussed earlier

There are only two hard problems in computer science: naming and cache invalidation 😉.
As confusing as the term throttle is, it's a well established term in reactive frameworks on Apple platforms. Replacing it with something else would make things even more confusing. And as I mentioned, Combine.Publisher and AsyncSequence essentially represent same concept in same ecosystem, so it makes sense to keep the naming aligned.
throttle with the last element is the most demanded of the similar ones, as it's often used in UI applications.

@drewmccormack
Copy link

I'm afraid I think this is part of the problem. Apple should have come up with their own naming, instead of adopting things from half baked Facebook frameworks and academic work half a century old. They had an opportunity, and they missed it.

The naming is quite alienating to most app-level developers. Sure, if you are right into functional or reactive programming, it probably makes sense, but that is not where Apple comes from. Terms like 'debounce' mean very little to someone who has been using UIKit or AppKit (ie most developers).

The discussion above demonstrates it quite well IMO. It is pretty clear people have different ideas about what throttle means, and the accepted terminology actually makes no sense. It was a poor choice to begin with.

But even if "throttle" is the accepted term, it should be very clear that it drops items in the stream. That should at least be stated in the docs, and not left up to how much experience of the reader has with ReactNative or whatever 3rd party framework is using it.

And as far as I can tell, there is still another type of function — a true throttle that doesn't drop items — which is missing. Perhaps it is there, but I can't find it. I assumed "throttle" would be it, but seems it is missing.

@Gernot
Copy link

Gernot commented Sep 17, 2024

Is there any roadmap for when there'll be a solution for this? I hoped that there was a 1.1 release planned in sync with swift 6, but it seems it doesn't work like this.
I don't really care how it's named, I just want an async function that lets me limit the number of values per time and let's me chose which one it filters – without omitting the last value. I can't understand why this is an issue that's open so long, if the code is done but the naming isn't.

@phausler
Copy link
Member

it isn't per se a naming issue but instead a focus of folks working on it; if you want to take this up and drive a discussion on the forums to resolve it id welcome that!

@mattgallagher
Copy link

mattgallagher commented Jan 5, 2025

I had a quick look at this issue (since I'd like to have it resolved for one of my projects).

Looking at the _AsyncThrottleSequence.Iterator.next in https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncThrottleSequence.swift

On each loop of the repeat the scenario is either:

  • no cached value in the throttle reducer (in which case you just want the existing try await base.next())
  • cached values in the throttle reducer (in which case you want whichever completes first out of: same as above OR sleeping until the throttle window elapses).

(the current implementation handles only the first case, not the second - which is the source of this issue)

The complication though, is that in the case where the sleep elapses first, we don't want to cancel the next() call (since that would discard one of the stream values). Instead, we would want to keep holding the Task that called base.next() around for the next iteration.

But Swift concurrency can't do this. None of the Swift structured concurrency components (Task, TaskGroup, etc) will let us move forward unless the task completes but the task is completely deadlocked on next().

What we would need is some way to cancel try await someTask.value WITHOUT someTask completing. Either a timeout or responding to the cancellation of the enclosing task. Without the ability to move forward (and ignoring the fact that someTask is still running) we're stuck.

I think this kind of interruption is what was requested here: https://forums.swift.org/t/why-doesnt-await-task-value-set-up-a-cancellation-handler/57740 but it seems like the need for it wasn't understood so it wasn't implemented.

I can definitely see how this might be implemented with a DispatchSemaphore but these have been avoided in Swift Concurrency. I think either a semaphore (or changes to the Swift standard library around Task.value) is required to make this work. But I suspect that DispatchSemaphore is not the right kind of code to submit to this repo.

@mattgallagher
Copy link

mattgallagher commented Jan 5, 2025

Okay, perhaps there is a way to do this. My problem above was trying to solve the problem as a TaskGroup. But using Swift continuations, we have a lot more flexibility. It requires some unstructured child tasks to do it but it seems to work? It's not exactly simple, though.

Curious to know if anyone has any thoughts about this approach. It's a drop-in replacement for _AsyncThrottleSequence.Iterator in AsyncThrottleSequence.swift.

    /// The iterator for an `AsyncThrottleSequence` instance.
    public struct Iterator: AsyncIteratorProtocol {
        var base: Base.AsyncIterator
        
        var last: C.Instant?
        let interval: C.Instant.Duration
        let clock: C
        let reducing: @Sendable (Reduced?, Base.Element) async -> Reduced
        var baseNextTask: Task<Base.Element?, Error>?
        
        enum Emit {
            case timer
            case next(Base.Element?)
        }
        
        enum State {
            case waiting(CheckedContinuation<Emit, Error>)
            case notWaiting(Result<Emit, Error>?)
        }
        
        init(_ base: Base.AsyncIterator, interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Base.Element) async -> Reduced) {
            self.base = base
            self.interval = interval
            self.clock = clock
            self.reducing = reducing
        }
        
        public mutating func next() async rethrows -> Reduced? {
            var reduced: Reduced?
            let start = last ?? clock.now
            let result = Mutex<State>(.notWaiting(nil))
            repeat {
                var timerTask: Task<Void, Error>?
                do {
                    let emit = try await withTaskCancellationHandler {
                        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Emit, Error>) in
                            result.withLock { result in
                                // Check if a previous iteration of the repeat loop has emitted a value
                                if case .notWaiting(let emittedResult?) = result {
                                    result = .notWaiting(nil)
                                    continuation.resume(with: emittedResult)
                                    return
                                } else {
                                    // Common case is that we start waiting
                                    result = .waiting(continuation)
                                }
                            }
                            
                            if reduced != nil, let last {
                                // If we already have a cached reduction start a timer so we can just emit what we already
                                // have when the timer elapses
                                timerTask = Task { [interval, last, clock] in
                                    let amount = interval - last.duration(to: clock.now)
                                    if amount > .zero {
                                        try await clock.sleep(until: clock.now.advanced(by: amount), tolerance: nil)
                                    }
                                    result.withLock { result in
                                        // In all cases, emit only if we won the race
                                        guard case .waiting(let continuation) = result else { return }
                                        result = .notWaiting(nil)
                                        continuation.resume(with: .success(Emit.timer))
                                        return
                                    }
                                }
                            }
                            
                            if reduced == nil, let baseNextTask, !baseNextTask.isCancelled {
                                // If we have no cached reduction and have a baseNextTask from a previous iteration
                                // Just wait on the baseNextTask since there's no other source of values
                                Task {
                                    let element = try await baseNextTask.value
                                    result.withLock { result in
                                        guard case .waiting(let continuation) = result else { return }
                                        result = .notWaiting(nil)
                                        continuation.resume(with: .success(.next(element)))
                                        
                                        // We can't mutate Iterator.baseNextTask back to nil from here so mark the task
                                        // as cancelled, which should be considered equivalent.
                                        baseNextTask.cancel()
                                        return
                                    }
                                }
                            } else if baseNextTask?.isCancelled ?? true {
                                // We have no "next" task so start a new one
                                baseNextTask = Task<Base.Element?, Error> { [base] in
                                    var base = base
                                    do {
                                        let element = try await base.next()
                                        result.withLock { result in
                                            switch result {
                                            case .waiting(let continuation):
                                                result = .notWaiting(nil)
                                                continuation.resume(with: .success(.next(element)))
                                            case .notWaiting:
                                                result = .notWaiting(.success(.next(element)))
                                            }
                                            
                                            // We can't mutate Iterator.baseNextTask back to nil from here so mark the task
                                            // as cancelled, which should be considered equivalent.
                                            withUnsafeCurrentTask { task in task?.cancel() }
                                            return
                                        }
                                        return element
                                    } catch {
                                        result.withLock { result in
                                            switch result {
                                            case .waiting(let continuation):
                                                result = .notWaiting(nil)
                                                continuation.resume(with: .failure(error))
                                            case .notWaiting:
                                                result = .notWaiting(.failure(error))
                                            }

                                            // We can't mutate Iterator.baseNextTask back to nil from here so mark the task
                                            // as cancelled, which should be considered equivalent.
                                            withUnsafeCurrentTask { task in task?.cancel() }
                                            return
                                        }
                                        throw error
                                    }
                                }
                            }
                        }
                    } onCancel: {
                        result.withLock { result in
                            // On cancellation, immediately complete the continuation (if possible)
                            guard case .waiting(let continuation) = result else { return }
                            result = .notWaiting(nil)
                            continuation.resume(throwing: CancellationError())
                        }
                    }
                    
                    // Clean up our unstructured tasks
                    timerTask?.cancel()
                    if let baseNextTask, baseNextTask.isCancelled {
                        self.baseNextTask = nil
                    }
                    
                    switch emit {
                    case .timer:
                        // Emit cached values when the timer fires
                        last = clock.now
                        return reduced
                    case .next(let element):
                        if let element {
                            // Reduce and cache the value
                            reduced = await reducing(reduced, element)

                            // Choose to either emit the new value or continue
                            // accumulating
                            let now = clock.now
                            if start.duration(to: now) >= interval || last == nil {
                                last = now
                                return reduced
                            }
                        } else {
                            // End of stream.
                            if reduced != nil, let last {
                                // If we're going to emit additional values, make sure we've waited enough
                                // since the previous values
                                let amount = interval - last.duration(to: clock.now)
                                if amount > .zero {
                                    try await clock.sleep(until: clock.now.advanced(by: amount), tolerance: nil)
                                }
                            }
                            return reduced
                        }
                    }
                } catch is CancellationError {
                    // Be sure to cancel unstructured tasks on the way out
                    timerTask?.cancel()
                    baseNextTask?.cancel()
                    return nil
                }
            } while true
        }
    }

@tarbaiev-smg
Copy link
Author

@mattgallagher Looks promising. Although it's hard to tell for sure without a comprehensive amount of tests.
Let's submit a PR and see what maintainers say?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests