Skip to content

Commit

Permalink
Re-implement streamToAsyncIterable without data event.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Sep 14, 2020
1 parent 4f0c29b commit f6419f7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 27 deletions.
75 changes: 49 additions & 26 deletions src/ComunicaEngine.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,38 +81,61 @@ export default class ComunicaEngine {
* Transforms the readable into an asynchronously iterable object
*/
streamToAsyncIterable(readable) {
// Track errors even when no next item is being requested
let pendingError;
readable.once('error', error => pendingError = error);
// Return a asynchronous iterable
let done = false;
let pendingError = null;
let pendingPromise = null;

readable.on('end', endIterator);
readable.on('error', rejectPromise);
readable.on('readable', settlePromise);

return {
next: () => new Promise(readNext),
next: () => new Promise(trackPromise),
[Symbol.asyncIterator]() { return this; },
};

// Reads the next item
function readNext(resolve, reject) {
if (pendingError)
return reject(pendingError);
if (readable.ended)
return resolve({ done: true });

// Attach stream listeners
readable.on('data', yieldValue);
readable.on('end', finish);
readable.on('error', finish);

// Outputs the value through the iterable
function yieldValue(value) {
finish(null, value, true);
function trackPromise(resolve, reject) {
pendingPromise = { resolve, reject };
settlePromise();
}

function endIterator() {
done = true;
settlePromise();
}

function settlePromise() {
// Finish if the stream errored or ended
if (done || pendingError) {
finish();
}
// Clean up, and reflect the state in the iterable
function finish(error, value, pending) {
readable.removeListener('data', yieldValue);
readable.removeListener('end', finish);
readable.removeListener('error', finish);
return error ? reject(error) : resolve({ value, done: !pending });
// Try to resolve the promise with a value
else if (pendingPromise) {
const value = readable.read();
if (value !== null) {
pendingPromise.resolve({ value });
pendingPromise = null;
}
}
}

function rejectPromise(error) {
if (!pendingError)
pendingError = error;
finish();
}

function finish() {
if (pendingPromise) {
if (done)
pendingPromise.resolve({ done });
else
pendingPromise.reject(pendingError);
pendingPromise = null;
}
readable.on('end', endIterator);
readable.on('error', rejectPromise);
readable.on('readable', settlePromise);
}
}

Expand Down
11 changes: 10 additions & 1 deletion test/ComunicaEngine-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,25 @@ describe('An ComunicaEngine instance without default source', () => {

it('reads an ended stream', async () => {
const stream = new Readable();
stream.ended = true;
stream.push(null);
const result = engine.streamToAsyncIterable(stream);
expect(await readAll(result)).toHaveLength(0);
});

it('reads a stream that ends immediately', async () => {
const stream = new Readable();
const result = engine.streamToAsyncIterable(stream);
stream.push(null);
await new Promise(resolve => setImmediate(resolve));
expect(await readAll(result)).toHaveLength(0);
});

it('throws an error when the stream errors before reading starts', async () => {
const stream = new Readable();
stream._read = () => {};
const result = engine.streamToAsyncIterable(stream);
stream.emit('error', new Error('my error'));
stream.emit('error', new Error('my other error'));
await expect(readAll(result)).rejects.toThrow('my error');
});

Expand Down

0 comments on commit f6419f7

Please sign in to comment.