Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnionIterator skips not readable subiterators #106

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
@extends module:asynciterator.BufferedIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _sources : {read: boolean, source: InternalSource<T>}[] = [];
maartyman marked this conversation as resolved.
Show resolved Hide resolved
private _pending? : { loading: boolean, sources?: AsyncIterator<MaybePromise<AsyncIterator<T>>> };
private _currentSource = -1;
protected _destroySources: boolean;
Expand Down Expand Up @@ -1784,10 +1784,14 @@ export class UnionIterator<T> extends BufferedIterator<T> {
if (isPromise(source))
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
const sourceObj = { read: true, source };
this._sources.push(sourceObj);
source[DESTINATION] = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('readable', () => {
sourceObj.read = true;
maartyman marked this conversation as resolved.
Show resolved Hide resolved
maartyman marked this conversation as resolved.
Show resolved Hide resolved
destinationFillBuffer.bind(<InternalSource<unknown>>sourceObj.source)();
});
source.on('end', destinationRemoveEmptySources);
}
}
Expand All @@ -1796,9 +1800,9 @@ export class UnionIterator<T> extends BufferedIterator<T> {
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
if (source.source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
return !source.source.done;
});
this._fillBuffer();
}
Expand All @@ -1818,7 +1822,7 @@ export class UnionIterator<T> extends BufferedIterator<T> {
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
if ((item = source.source.read()) !== null) {
count--;
this._push(item);
}
Expand All @@ -1837,7 +1841,7 @@ export class UnionIterator<T> extends BufferedIterator<T> {
// Destroy all sources that are still readable
if (this._destroySources) {
for (const source of this._sources)
source.destroy();
source.source.destroy();

// Also close the sources stream if applicable
if (this._pending) {
Expand Down
Loading