Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnionIterator skips not readable subiterators #106

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
@extends module:asynciterator.BufferedIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _sources : { requiresRead: boolean, source: InternalSource<T> }[] = [];
private _pending? : { loading: boolean, sources?: AsyncIterator<MaybePromise<AsyncIterator<T>>> };
private _currentSource = -1;
protected _destroySources: boolean;
Expand Down Expand Up @@ -1784,10 +1784,14 @@ export class UnionIterator<T> extends BufferedIterator<T> {
if (isPromise(source))
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
const sourceObj = { requiresRead: true, source };
this._sources.push(sourceObj);
source[DESTINATION] = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('readable', () => {
sourceObj.requiresRead = true;
destinationFillBuffer.bind(<InternalSource<unknown>>sourceObj.source)();
});
source.on('end', destinationRemoveEmptySources);
}
}
Expand All @@ -1796,9 +1800,9 @@ export class UnionIterator<T> extends BufferedIterator<T> {
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
if (source.source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
return !source.source.done;
});
this._fillBuffer();
}
Expand All @@ -1817,8 +1821,11 @@ export class UnionIterator<T> extends BufferedIterator<T> {
// Pick the next source
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
if (!source.source.readable && !source.requiresRead)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason reason why only having !source.source.readable is insufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to jumpstart the sub-iterators, I added this because I had issues with it in the past. If I remove it, all test still pass, so I guess it's not necessary? I guess an iterator should be created with readable set to true...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess an iterator should be created with readable set to true...

Yes, for iterators without autoStart, readable should be true.

continue;
source.requiresRead = false;
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
if ((item = source.source.read()) !== null) {
count--;
this._push(item);
}
Expand All @@ -1837,7 +1844,7 @@ export class UnionIterator<T> extends BufferedIterator<T> {
// Destroy all sources that are still readable
if (this._destroySources) {
for (const source of this._sources)
source.destroy();
source.source.destroy();

// Also close the sources stream if applicable
if (this._pending) {
Expand Down
Loading