Skip to content

Commit

Permalink
Fix MultiTransformIterator not destroying transformers on close
Browse files Browse the repository at this point in the history
This was discovered in comunica/comunica#950

This may also be the root-cause of comunica/comunica#989
  • Loading branch information
rubensworks committed Jun 27, 2022
1 parent a536bca commit 5e58b31
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
10 changes: 10 additions & 0 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,16 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
if (!this._transformerQueue.length)
this.close();
}

protected _end(destroy: boolean) {
super._end(destroy);

// Also destroy the open transformers left in the queue
if (this._destroySource) {
for (const item of this._transformerQueue)
item.transformer.destroy();
}
}
}

/**
Expand Down
104 changes: 104 additions & 0 deletions test/MultiTransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,108 @@ describe('MultiTransformIterator', () => {
});
});
});

describe('A MultiTransformIterator with transformers that never end', () => {
let iterator, source, multiTransform;
beforeEach(() => {
source = new ArrayIterator(['a', 'b', 'c', 'd', 'e', 'f']);
multiTransform = sinon.spy(item => {
const transformer = new BufferedIterator();
scheduleTask(() => {
transformer._push(`${item}1`);
transformer._push(`${item}2`);
transformer._push(`${item}3`);
});
return transformer;
});
});

describe('with autoStart and with destroySource', () => {
beforeEach(() => {
iterator = new MultiTransformIterator(source, { multiTransform, autoStart: true, destroySource: true });
});

it('should destroy the transformers in the queue when closing', async () => {
// Wait until the iterator has produced one result
await new Promise(resolve => iterator.on('data', resolve));

// Immediately close the iterator
iterator.close();

// Wait until the iterator has ended
await new Promise(resolve => iterator.on('end', resolve));

// All transformers in the queue must have been closed
iterator._transformerQueue.length.should.equal(4);
for (const item of iterator._transformerQueue)
item.transformer.closed.should.be.true;
});
});

describe('without autoStart and with destroySource', () => {
beforeEach(() => {
iterator = new MultiTransformIterator(source, { multiTransform, autoStart: false, destroySource: true });
});

it('should destroy the transformers in the queue when closing', async () => {
// Wait until the iterator has produced one result
await new Promise(resolve => iterator.on('data', resolve));

// Immediately close the iterator
iterator.close();

// Wait until the iterator has ended
await new Promise(resolve => iterator.on('end', resolve));

// All transformers in the queue must have been closed
iterator._transformerQueue.length.should.equal(4);
for (const item of iterator._transformerQueue)
item.transformer.closed.should.be.true;
});
});

describe('with autoStart and without destroySource', () => {
beforeEach(() => {
iterator = new MultiTransformIterator(source, { multiTransform, autoStart: true, destroySource: false });
});

it('should destroy the transformers in the queue when closing', async () => {
// Wait until the iterator has produced one result
await new Promise(resolve => iterator.on('data', resolve));

// Immediately close the iterator
iterator.close();

// Wait until the iterator has ended
await new Promise(resolve => iterator.on('end', resolve));

// All transformers in the queue must not have been closed
iterator._transformerQueue.length.should.equal(4);
for (const item of iterator._transformerQueue)
item.transformer.closed.should.be.false;
});
});

describe('without autoStart and without destroySource', () => {
beforeEach(() => {
iterator = new MultiTransformIterator(source, { multiTransform, autoStart: false, destroySource: false });
});

it('should destroy the transformers in the queue when closing', async () => {
// Wait until the iterator has produced one result
await new Promise(resolve => iterator.on('data', resolve));

// Immediately close the iterator
iterator.close();

// Wait until the iterator has ended
await new Promise(resolve => iterator.on('end', resolve));

// All transformers in the queue must not have been closed
iterator._transformerQueue.length.should.equal(4);
for (const item of iterator._transformerQueue)
item.transformer.closed.should.be.false;
});
});
});
});

0 comments on commit 5e58b31

Please sign in to comment.