diff --git a/src/ComunicaEngine.js b/src/ComunicaEngine.js index c86f8f7..8c2824a 100644 --- a/src/ComunicaEngine.js +++ b/src/ComunicaEngine.js @@ -81,38 +81,61 @@ export default class ComunicaEngine { * Transforms the readable into an asynchronously iterable object */ streamToAsyncIterable(readable) { - // Track errors even when no next item is being requested - let pendingError; - readable.once('error', error => pendingError = error); - // Return a asynchronous iterable + let done = false; + let pendingError = null; + let pendingPromise = null; + + readable.on('end', endIterator); + readable.on('error', rejectPromise); + readable.on('readable', settlePromise); + return { - next: () => new Promise(readNext), + next: () => new Promise(trackPromise), [Symbol.asyncIterator]() { return this; }, }; - // Reads the next item - function readNext(resolve, reject) { - if (pendingError) - return reject(pendingError); - if (readable.ended) - return resolve({ done: true }); - - // Attach stream listeners - readable.on('data', yieldValue); - readable.on('end', finish); - readable.on('error', finish); - - // Outputs the value through the iterable - function yieldValue(value) { - finish(null, value, true); + function trackPromise(resolve, reject) { + pendingPromise = { resolve, reject }; + settlePromise(); + } + + function endIterator() { + done = true; + settlePromise(); + } + + function settlePromise() { + // Finish if the stream errored or ended + if (done || pendingError) { + finish(); } - // Clean up, and reflect the state in the iterable - function finish(error, value, pending) { - readable.removeListener('data', yieldValue); - readable.removeListener('end', finish); - readable.removeListener('error', finish); - return error ? reject(error) : resolve({ value, done: !pending }); + // Try to resolve the promise with a value + else if (pendingPromise) { + const value = readable.read(); + if (value !== null) { + pendingPromise.resolve({ value }); + pendingPromise = null; + } + } + } + + function rejectPromise(error) { + if (!pendingError) + pendingError = error; + finish(); + } + + function finish() { + if (pendingPromise) { + if (done) + pendingPromise.resolve({ done }); + else + pendingPromise.reject(pendingError); + pendingPromise = null; } + readable.on('end', endIterator); + readable.on('error', rejectPromise); + readable.on('readable', settlePromise); } } diff --git a/test/ComunicaEngine-test.js b/test/ComunicaEngine-test.js index 98ff876..6620c27 100644 --- a/test/ComunicaEngine-test.js +++ b/test/ComunicaEngine-test.js @@ -133,16 +133,25 @@ describe('An ComunicaEngine instance without default source', () => { it('reads an ended stream', async () => { const stream = new Readable(); - stream.ended = true; + stream.push(null); const result = engine.streamToAsyncIterable(stream); expect(await readAll(result)).toHaveLength(0); }); + it('reads a stream that ends immediately', async () => { + const stream = new Readable(); + const result = engine.streamToAsyncIterable(stream); + stream.push(null); + await new Promise(resolve => setImmediate(resolve)); + expect(await readAll(result)).toHaveLength(0); + }); + it('throws an error when the stream errors before reading starts', async () => { const stream = new Readable(); stream._read = () => {}; const result = engine.streamToAsyncIterable(stream); stream.emit('error', new Error('my error')); + stream.emit('error', new Error('my other error')); await expect(readAll(result)).rejects.toThrow('my error'); });