Skip to content

Commit

Permalink
Support lazy UnionIterator.
Browse files Browse the repository at this point in the history
Closes #19
  • Loading branch information
RubenVerborgh committed Jul 3, 2020
1 parent acd905e commit 2e3e6a2
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 13 deletions.
50 changes: 37 additions & 13 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _sourcesComplete = true;
private _pending? : { sources?: AsyncIterator<AsyncIterator<T>> };
private _currentSource = -1;

/**
Expand All @@ -1427,30 +1427,49 @@ export class UnionIterator<T> extends BufferedIterator<T> {
@param {object} [options] Settings of the iterator
*/
constructor(sources: AsyncIteratorOrArray<AsyncIterator<T>>,
options?: BufferedIteratorOptions) {
options: BufferedIteratorOptions = {}) {
super(options);
const autoStart = options.autoStart !== false;

// Sources have been passed as an iterator
if (isEventEmitter(sources)) {
sources.on('error', error => this.emit('error', error));
this._pending = { sources };
if (autoStart)
this._loadSources();
}
// Sources have been passed as a non-empty array
if (Array.isArray(sources) && sources.length > 0) {
else if (Array.isArray(sources) && sources.length > 0) {
for (const source of sources)
this._addSource(source as InternalSource<T>);
}
// Sources have been passed as an open iterator
else if (isEventEmitter(sources) && !sources.done) {
this._sourcesComplete = false;
// Sources are an empty list
else if (autoStart) {
this.close();
}
}

// Loads sources passed as an iterator
protected _loadSources() {
// Obtain sources iterator
const sources = this._pending!.sources!;
delete this._pending!.sources;

// Close immediately if done
if (sources.done) {
delete this._pending;
this.close();
}
// Otherwise, set up source reading
else {
sources.on('data', source => {
this._addSource(source as InternalSource<T>);
this._fillBufferAsync();
});
sources.on('end', () => {
this._sourcesComplete = true;
delete this._pending;
this._fillBuffer();
});
sources.on('error', error => this.emit('error', error));
}
// Sources are an empty list
else {
this.close();
}
}

Expand Down Expand Up @@ -1478,6 +1497,10 @@ export class UnionIterator<T> extends BufferedIterator<T> {

// Reads items from the next sources
protected _read(count: number, done: () => void): void {
// Start source loading if needed
if (this._pending?.sources)
this._loadSources();

// Try to read `count` items
let lastCount = 0, item : T | null;
while (lastCount !== (lastCount = count)) {
Expand All @@ -1493,8 +1516,9 @@ export class UnionIterator<T> extends BufferedIterator<T> {
}
}
}

// Close this iterator if all of its sources have been read
if (this._sourcesComplete && this._sources.length === 0)
if (!this._pending && this._sources.length === 0)
this.close();
done();
}
Expand Down
142 changes: 142 additions & 0 deletions test/UnionIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ describe('UnionIterator', () => {
});
});

describe('when constructed with an array of 0 sources without autoStart', () => {
let iterator;
before(() => {
const sources = [];
iterator = new UnionIterator(sources, { autoStart: false });
});

describe('before reading', () => {
it('should not have ended', () => {
iterator.ended.should.be.false;
});
});

describe('after reading', () => {
before(done => {
iterator.read();
queueMicrotask(done);
});

it('should have ended', () => {
iterator.ended.should.be.true;
});
});
});

describe('when constructed with an array of 1 source', () => {
let iterator;
before(() => {
Expand Down Expand Up @@ -127,6 +152,31 @@ describe('UnionIterator', () => {
});
});

describe('when constructed with an iterator of 0 sources without autoStart', () => {
let iterator;
before(() => {
const sources = [];
iterator = new UnionIterator(new ArrayIterator(sources), { autoStart: false });
});

describe('before reading', () => {
it('should not have ended', () => {
iterator.ended.should.be.false;
});
});

describe('after reading', () => {
before(done => {
iterator.read();
queueMicrotask(done);
});

it('should have ended', () => {
iterator.ended.should.be.true;
});
});
});

describe('when constructed with an iterator of 1 source', () => {
let iterator;
before(() => {
Expand Down Expand Up @@ -166,6 +216,98 @@ describe('UnionIterator', () => {
});
});

describe('when constructed with an iterator and with autoStart', () => {
let iterator, sourceIterator;
before(() => {
const sources = [range(0, 2), range(3, 6)];
sourceIterator = new ArrayIterator(sources);
sinon.spy(sourceIterator, 'read');
iterator = new UnionIterator(sourceIterator, { autoStart: true });
});

describe('before reading', () => {
it('should have read the sources', () => {
sourceIterator.read.should.have.been.called;
});

it('should not have ended', () => {
iterator.ended.should.be.false;
});

it('should pass errors', () => {
const callback = sinon.spy();
const error = new Error('error');
iterator.once('error', callback);
sourceIterator.emit('error', error);
callback.should.have.been.calledOnce;
callback.should.have.been.calledWith(error);
});
});

describe('after reading', () => {
let items;
before(async () => {
items = (await toArray(iterator)).sort();
});

it('should have emitted all items', () => {
items.should.eql([0, 1, 2, 3, 4, 5, 6]);
});

it('should have ended', () => {
iterator.ended.should.be.true;
});
});
});

describe('when constructed with an iterator and without autoStart', () => {
let iterator, sourceIterator;
before(() => {
const sources = [range(0, 2), range(3, 6)];
sourceIterator = new ArrayIterator(sources);
sinon.spy(sourceIterator, 'read');
iterator = new UnionIterator(sourceIterator, { autoStart: false });
});

describe('before reading', () => {
it('should not have read the sources', () => {
sourceIterator.read.should.not.have.been.called;
});

it('should not have ended', () => {
iterator.ended.should.be.false;
});

it('should pass errors', () => {
const callback = sinon.spy();
const error = new Error('error');
iterator.once('error', callback);
sourceIterator.emit('error', error);
callback.should.have.been.calledOnce;
callback.should.have.been.calledWith(error);
});
});

describe('after reading', () => {
let items;
before(async () => {
items = (await toArray(iterator)).sort();
});

it('should have read the sources', () => {
sourceIterator.read.should.have.been.called;
});

it('should have emitted all items', () => {
items.should.eql([0, 1, 2, 3, 4, 5, 6]);
});

it('should have ended', () => {
iterator.ended.should.be.true;
});
});
});

describe('a UnionIterator with two sources', () => {
let iterator, sources;

Expand Down

0 comments on commit 2e3e6a2

Please sign in to comment.