Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify the internal implementation of KurtResult. #6

Merged
merged 1 commit into from
May 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 38 additions & 46 deletions src/KurtResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,15 @@ export class KurtResult<T extends KurtSchemaInnerMaybe = undefined>
private started: boolean = false
private finished: boolean = false
private seenEvents: KurtResultEvent<T>[] = []
private finalError?: { uncaughtError: unknown }
private additionalListeners = new Set<_AdditionalListener<T>>()
private finalEventValue?: KurtResultEventFinal<T>
private finalEventPromise: Promise<KurtResultEventFinal<T>>
private finalEventResolve!: (e: Promisable<KurtResultEventFinal<T>>) => void
private finalEventReject!: (reason?: unknown) => void

// Create a new result stream, from the given underlying stream generator.
constructor(private gen: AsyncGenerator<KurtResultEvent<T>>) {
this.finalEventPromise = new Promise((resolve, reject) => {
this.finalEventResolve = resolve
this.finalEventReject = reject
})
}
constructor(private gen: AsyncGenerator<KurtResultEvent<T>>) {}

// Get the final event from the end of the result stream, when it is ready.
get finalEvent(): Promise<KurtResultEventFinal<T>> {
return this._finalEvent()
}
private async _finalEvent() {
// If nobody has started listening to the stream yet, we need to be
// the first listener of it. This makes sure we will eventually fulfill
// the promise that we are about to await on.
if (!this.started) for await (const _ of this) null

// Now wait for the underlying promise to fulfill.
return this.finalEventPromise
return toFinal(this)
}

// Get the text from the end of the result stream, when it is ready.
Expand Down Expand Up @@ -93,35 +76,31 @@ export class KurtResult<T extends KurtSchemaInnerMaybe = undefined>
this.started = true
try {
for await (const event of this.gen) {
// Yield the event, capture it in our internal event history,
// Capture the event in our internal event history,
// and push it to any additional listeners who are waiting.
yield event
this.seenEvents.push(event)
for (const listener of this.additionalListeners) listener(event)

// If this is the final event, store it, resolve the final promise,
// and break out of the event-receiving loop.
// We need to yield the event *AFTER* we've done the bookkeeping above,
// because our caller may decide to stop calling the generator at
// any time, and we wouldn't want that to break other listeners.
yield event

// If this is the final event, break out of the event-receiving loop.
//
// If we have a misbehaving underlying generator that has more events
// after the finish event, we'll be ignoring those.
if ("finished" in event) {
this.finalEventValue = event
this.finalEventResolve(event)
break
}
if ("finished" in event) break
}
} catch (e) {
// If we catch an error, we need to reject the final promise and also
// If we catch an error, we need to store the error and also
// notify any additional listeners, so that each of them can stop
// listening and throw the error to their outer caller.
this.finalEventReject(e)
for (const listener of this.additionalListeners)
listener({ uncaughtError: e })

// Finally we'll await on the promise we rejected above, to make sure
// that the error is thrown to the outer caller, and ensure that it
// doesn't "appear to be" an uncaught promise rejection.
await this.finalEventPromise
this.finalError = { uncaughtError: e }
for (const listener of this.additionalListeners) listener(this.finalError)

// Finally, we throw it to our own caller/listener.
throw e
} finally {
// We need to make sure we mark the stream as finished, even in the
// case where the stream had an error before receiving the final event.
Expand All @@ -135,13 +114,12 @@ export class KurtResult<T extends KurtSchemaInnerMaybe = undefined>
// First, catch up on any events we missed so far.
for (const event of this.seenEvents) yield event

// If the stream is already finished (or errored), we don't need to actually
// set up a listener. We can just join onto the final event promise,
// to make sure we throw the error from it if the promise was rejected.
if (this.finished) {
await this.finalEventPromise
return
}
// Now, if we have a final error, we need to throw it to our caller.
if (this.finalError) throw this.finalError.uncaughtError

// If we know that the stream is already finished, we can stop here and
// avoid unnecessarily creating an actual listener mechanism.
if (this.finished) return

// To make this generator work, we need to set up a replaceable promise
// that will receive the next event (or error) via the listener callback.
Expand Down Expand Up @@ -171,7 +149,7 @@ export class KurtResult<T extends KurtSchemaInnerMaybe = undefined>
try {
// Now our generator is as simple as waiting on the next promise,
// over and over again, until we see the final event.
while (!this.finalEventValue) {
while (!this.finished) {
const event = await nextEvent
yield event
if ("finished" in event) break
Expand All @@ -182,3 +160,17 @@ export class KurtResult<T extends KurtSchemaInnerMaybe = undefined>
}
}
}

async function toFinal<T extends KurtSchemaInnerMaybe = undefined>(
result: KurtResult<T>
): Promise<KurtResultEventFinal<T>> {
for await (const event of result) {
if ("finished" in event) {
return event
}
}
throw new Error(
"KurtResult never sent a finish event (or an error). " +
"This is a bug in the Kurt library that should be reported"
)
}
Loading