diff --git a/src/KurtResult.ts b/src/KurtResult.ts index 5d8aec0..70c08b6 100644 --- a/src/KurtResult.ts +++ b/src/KurtResult.ts @@ -33,32 +33,15 @@ export class KurtResult private started: boolean = false private finished: boolean = false private seenEvents: KurtResultEvent[] = [] + private finalError?: { uncaughtError: unknown } private additionalListeners = new Set<_AdditionalListener>() - private finalEventValue?: KurtResultEventFinal - private finalEventPromise: Promise> - private finalEventResolve!: (e: Promisable>) => void - private finalEventReject!: (reason?: unknown) => void // Create a new result stream, from the given underlying stream generator. - constructor(private gen: AsyncGenerator>) { - this.finalEventPromise = new Promise((resolve, reject) => { - this.finalEventResolve = resolve - this.finalEventReject = reject - }) - } + constructor(private gen: AsyncGenerator>) {} // Get the final event from the end of the result stream, when it is ready. get finalEvent(): Promise> { - return this._finalEvent() - } - private async _finalEvent() { - // If nobody has started listening to the stream yet, we need to be - // the first listener of it. This makes sure we will eventually fulfill - // the promise that we are about to await on. - if (!this.started) for await (const _ of this) null - - // Now wait for the underlying promise to fulfill. - return this.finalEventPromise + return toFinal(this) } // Get the text from the end of the result stream, when it is ready. @@ -93,35 +76,31 @@ export class KurtResult this.started = true try { for await (const event of this.gen) { - // Yield the event, capture it in our internal event history, + // Capture the event in our internal event history, // and push it to any additional listeners who are waiting. - yield event this.seenEvents.push(event) for (const listener of this.additionalListeners) listener(event) - // If this is the final event, store it, resolve the final promise, - // and break out of the event-receiving loop. + // We need to yield the event *AFTER* we've done the bookkeeping above, + // because our caller may decide to stop calling the generator at + // any time, and we wouldn't want that to break other listeners. + yield event + + // If this is the final event, break out of the event-receiving loop. // // If we have a misbehaving underlying generator that has more events // after the finish event, we'll be ignoring those. - if ("finished" in event) { - this.finalEventValue = event - this.finalEventResolve(event) - break - } + if ("finished" in event) break } } catch (e) { - // If we catch an error, we need to reject the final promise and also + // If we catch an error, we need to store the error and also // notify any additional listeners, so that each of them can stop // listening and throw the error to their outer caller. - this.finalEventReject(e) - for (const listener of this.additionalListeners) - listener({ uncaughtError: e }) - - // Finally we'll await on the promise we rejected above, to make sure - // that the error is thrown to the outer caller, and ensure that it - // doesn't "appear to be" an uncaught promise rejection. - await this.finalEventPromise + this.finalError = { uncaughtError: e } + for (const listener of this.additionalListeners) listener(this.finalError) + + // Finally, we throw it to our own caller/listener. + throw e } finally { // We need to make sure we mark the stream as finished, even in the // case where the stream had an error before receiving the final event. @@ -135,13 +114,12 @@ export class KurtResult // First, catch up on any events we missed so far. for (const event of this.seenEvents) yield event - // If the stream is already finished (or errored), we don't need to actually - // set up a listener. We can just join onto the final event promise, - // to make sure we throw the error from it if the promise was rejected. - if (this.finished) { - await this.finalEventPromise - return - } + // Now, if we have a final error, we need to throw it to our caller. + if (this.finalError) throw this.finalError.uncaughtError + + // If we know that the stream is already finished, we can stop here and + // avoid unnecessarily creating an actual listener mechanism. + if (this.finished) return // To make this generator work, we need to set up a replaceable promise // that will receive the next event (or error) via the listener callback. @@ -171,7 +149,7 @@ export class KurtResult try { // Now our generator is as simple as waiting on the next promise, // over and over again, until we see the final event. - while (!this.finalEventValue) { + while (!this.finished) { const event = await nextEvent yield event if ("finished" in event) break @@ -182,3 +160,17 @@ export class KurtResult } } } + +async function toFinal( + result: KurtResult +): Promise> { + for await (const event of result) { + if ("finished" in event) { + return event + } + } + throw new Error( + "KurtResult never sent a finish event (or an error). " + + "This is a bug in the Kurt library that should be reported" + ) +}