diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index e256617b4..acd1b2dcb 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -564,7 +564,13 @@ export async function* pipe( ): AsyncIterable { const [transforms, opt] = pickTransforms(rest); let abortable: Abortable | undefined; - let iterable: AsyncIterable = source; + const sourceIt = source[Symbol.asyncIterator](); + const cachedSource = { + [Symbol.asyncIterator]() { + return sourceIt; + }, + }; + let iterable: AsyncIterable = cachedSource; if (opt?.propagateDownStreamError === true) { iterable = abortable = makeIterableAbortable(iterable); } @@ -594,14 +600,12 @@ export async function* pipe( // Call return on the source iterable to indicate // that we will no longer consume it and it should // cleanup any allocated resources. - source[Symbol.asyncIterator]() - .return?.() - .catch(() => { - // return returns a promise, which we don't care about. - // - // Uncaught promises are thrown at sometime/somewhere by the event loop, - // this is to ensure error is caught and ignored. - }); + sourceIt.return?.().catch(() => { + // return returns a promise, which we don't care about. + // + // Uncaught promises are thrown at sometime/somewhere by the event loop, + // this is to ensure error is caught and ignored. + }); } } } diff --git a/packages/connect/src/protocol/universal-fetch.ts b/packages/connect/src/protocol/universal-fetch.ts index 9363f9f6e..3ccec4ce8 100644 --- a/packages/connect/src/protocol/universal-fetch.ts +++ b/packages/connect/src/protocol/universal-fetch.ts @@ -152,9 +152,9 @@ function iterableToReadableStream( function iterableFromReadableStream( body: ReadableStream | null, ): AsyncIterable { + const reader = body?.getReader(); return { [Symbol.asyncIterator](): AsyncIterator { - const reader = body?.getReader(); return { async next() { if (reader !== undefined) {