From accdde5e94dca000e44d160fb53fb80b66f45ab5 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 4 Jul 2020 17:45:47 +0200 Subject: [PATCH] Allow customizing the task scheduler. Closes https://github.com/RubenVerborgh/AsyncIterator/issues/17 --- asynciterator.ts | 41 ++++++++++++++++------ test/AsyncIterator-test.js | 4 +-- test/BufferedIterator-test.js | 20 +++++------ test/MultiTransformIterator-test.js | 14 ++++---- test/SimpleTransformIterator-test.js | 4 +-- test/TaskScheduler-test.js | 51 ++++++++++++++++++++++++++++ test/TransformIterator-test.js | 6 ++-- test/UnionIterator-test.js | 10 +++--- 8 files changed, 109 insertions(+), 41 deletions(-) create mode 100644 test/TaskScheduler-test.js diff --git a/asynciterator.ts b/asynciterator.ts index e0ca3b6..d2d6c3b 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -6,6 +6,23 @@ import { EventEmitter } from 'events'; import queueMicrotask from 'queue-microtask'; +let taskScheduler: TaskScheduler = queueMicrotask; + +/** Schedules the given ask for asynchronous execution. */ +export function scheduleTask(task: () => void): void { + taskScheduler(task); +} + +/** Returns the asynchronous task scheduler. */ +export function getTaskScheduler(): TaskScheduler { + return taskScheduler; +} + +/** Sets the asynchronous task scheduler. */ +export function setTaskScheduler(scheduler: TaskScheduler): void { + taskScheduler = scheduler; +} + /** ID of the INIT state. An iterator is initializing if it is preparing main item generation. @@ -90,7 +107,7 @@ export class AsyncIterator extends EventEmitter { if (!eventAsync) this.emit('end'); else - queueMicrotask(() => this.emit('end')); + taskScheduler(() => this.emit('end')); } } return valid; @@ -210,7 +227,7 @@ export class AsyncIterator extends EventEmitter { @protected */ protected _endAsync() { - queueMicrotask(() => this._end()); + taskScheduler(() => this._end()); } /** @@ -236,7 +253,7 @@ export class AsyncIterator extends EventEmitter { this._readable = readable; // If the iterator became readable, emit the `readable` event if (readable) - queueMicrotask(() => this.emit('readable')); + taskScheduler(() => this.emit('readable')); } } @@ -308,7 +325,7 @@ export class AsyncIterator extends EventEmitter { return properties && properties[propertyName]; // If the value has been set, send it through the callback if (properties && (propertyName in properties)) { - queueMicrotask(() => callback(properties[propertyName])); + taskScheduler(() => callback(properties[propertyName])); } // If the value was not set, store the callback for when the value will be set else { @@ -336,7 +353,7 @@ export class AsyncIterator extends EventEmitter { const callbacks = propertyCallbacks[propertyName]; if (callbacks) { delete propertyCallbacks[propertyName]; - queueMicrotask(() => { + taskScheduler(() => { for (const callback of callbacks) callback(value); }); @@ -502,7 +519,7 @@ function waitForDataListener(this: AsyncIterator, eventName: string) { this.removeListener('newListener', waitForDataListener); addSingleListener(this, 'readable', emitData); if (this.readable) - queueMicrotask(() => emitData.call(this)); + taskScheduler(() => emitData.call(this)); } } // Emits new items though `data` events as long as there are `data` listeners @@ -710,7 +727,7 @@ export class BufferedIterator extends AsyncIterator { constructor({ maxBufferSize = 4, autoStart = true } = {}) { super(INIT); this.maxBufferSize = maxBufferSize; - queueMicrotask(() => this._init(autoStart)); + taskScheduler(() => this._init(autoStart)); } /** @@ -886,7 +903,7 @@ export class BufferedIterator extends AsyncIterator { // Acquire reading lock to avoid recursive reads if (!this._reading) { this._reading = true; - queueMicrotask(() => { + taskScheduler(() => { // Release reading lock so _fillBuffer` can take it this._reading = false; this._fillBuffer(); @@ -1072,7 +1089,7 @@ export class TransformIterator extends BufferedIterator { const next = () => { // Continue transforming until at least `count` items have been pushed if (this._pushedCount < count && !this.closed) - queueMicrotask(() => this._readAndTransform(next, done)); + taskScheduler(() => this._readAndTransform(next, done)); else done(); }; @@ -1220,10 +1237,10 @@ export class SimpleTransformIterator extends TransformIterator { /* Tries to read and transform items */ protected _read(count: number, done: () => void) { const next = () => this._readAndTransformSimple(count, nextAsync, done); + this._readAndTransformSimple(count, nextAsync, done); function nextAsync() { - queueMicrotask(next); + taskScheduler(next); } - this._readAndTransformSimple(count, nextAsync, done); } /* Reads and transform items */ @@ -1858,3 +1875,5 @@ type SourceExpression = type InternalSource = AsyncIterator & { _destination: AsyncIterator }; + +type TaskScheduler = (task: () => void) => void; diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index ef9c1a0..bc34388 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -4,10 +4,10 @@ import { CLOSED, ENDED, DESTROYED, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('AsyncIterator', () => { describe('The AsyncIterator module', () => { @@ -408,7 +408,7 @@ describe('AsyncIterator', () => { before(() => { iterator = new AsyncIterator(); captureEvents(iterator, 'data', 'readable', 'end'); - iterator._destroy = (error, callback) => queueMicrotask(callback); + iterator._destroy = (error, callback) => scheduleTask(callback); iterator.destroy(); }); diff --git a/test/BufferedIterator-test.js b/test/BufferedIterator-test.js index f198bb4..2a50b33 100644 --- a/test/BufferedIterator-test.js +++ b/test/BufferedIterator-test.js @@ -2,10 +2,10 @@ import { AsyncIterator, BufferedIterator, CLOSED, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('BufferedIterator', () => { describe('The BufferedIterator function', () => { @@ -301,7 +301,7 @@ describe('BufferedIterator', () => { function createIterator(options) { const iterator = new BufferedIterator(options); iterator._read = function (count, done) { - queueMicrotask(() => { + scheduleTask(() => { this.close(); done(); }); @@ -660,8 +660,8 @@ describe('BufferedIterator', () => { describe('after `read` is called and the iterator has been closed', () => { before(() => { iterator.read(); - queueMicrotask(() => { _readDone(); }); - queueMicrotask(() => { iterator.close(); }); + scheduleTask(() => { _readDone(); }); + scheduleTask(() => { iterator.close(); }); }); it('should have emitted the `end` event', () => { @@ -1292,7 +1292,7 @@ describe('BufferedIterator', () => { const iterator = new BufferedIterator(options); iterator._read = function (count, done) { this._push('a'); - queueMicrotask(() => { + scheduleTask(() => { this._push('b'); this._push('c'); done(); @@ -1408,9 +1408,7 @@ describe('BufferedIterator', () => { before(done => { iterator = new BufferedIterator({ autoStart: false }); iterator._read = (count, callback) => { readDone = callback; }; - // `queueMicrotask` because reading directly after construction does not call `_read`; - // this is necessary to enable attaching a `_begin` hook after construction - queueMicrotask(() => { iterator.read(); done(); }); + scheduleTask(() => { iterator.read(); done(); }); }); it('should cause an exception', () => { @@ -1637,7 +1635,7 @@ describe('BufferedIterator', () => { before(() => { iterator = new BufferedIterator(); iterator._begin = function (done) { - queueMicrotask(() => { + scheduleTask(() => { this._push('x'); this._push('y'); done(); @@ -1947,7 +1945,7 @@ describe('BufferedIterator', () => { done(); }; iterator._flush = function (done) { - queueMicrotask(() => { + scheduleTask(() => { this._push('x'); this._push('y'); done(); @@ -2206,7 +2204,7 @@ describe('BufferedIterator', () => { done(); }; iterator._flush = function (done) { - queueMicrotask(() => { + scheduleTask(() => { this._push('x'); this._push('y'); done(); diff --git a/test/MultiTransformIterator-test.js b/test/MultiTransformIterator-test.js index 036e99e..4e7ddd3 100644 --- a/test/MultiTransformIterator-test.js +++ b/test/MultiTransformIterator-test.js @@ -6,10 +6,10 @@ import { EmptyIterator, SingletonIterator, ArrayIterator, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('MultiTransformIterator', () => { describe('The MultiTransformIterator function', () => { @@ -146,7 +146,7 @@ describe('MultiTransformIterator', () => { iterator = new MultiTransformIterator(source); iterator._createTransformer = sinon.spy(() => { const transformer = new BufferedIterator(); - setImmediate(() => transformer.close()); + setTimeout(() => transformer.close(), 0); return transformer; }); }); @@ -181,7 +181,7 @@ describe('MultiTransformIterator', () => { iterator = new MultiTransformIterator(source); iterator._createTransformer = sinon.spy(item => { const transformer = new BufferedIterator(); - queueMicrotask(() => { + scheduleTask(() => { transformer._push(`${item}1`); transformer.close(); }); @@ -209,7 +209,7 @@ describe('MultiTransformIterator', () => { iterator = new MultiTransformIterator(source); iterator._createTransformer = sinon.spy(item => { const transformer = new BufferedIterator(); - queueMicrotask(() => { + scheduleTask(() => { transformer._push(`${item}1`); transformer._push(`${item}2`); transformer._push(`${item}3`); @@ -300,7 +300,7 @@ describe('MultiTransformIterator', () => { iterator = new MultiTransformIterator(source); iterator._createTransformer = sinon.spy(item => { const transformer = new BufferedIterator(); - queueMicrotask(() => { + scheduleTask(() => { transformer.emit('error', new Error(`Error ${item}`)); }); return transformer; @@ -319,7 +319,7 @@ describe('MultiTransformIterator', () => { source = new ArrayIterator(['a', 'b', 'c', 'd', 'e', 'f']); const multiTransform = sinon.spy(item => { const transformer = new BufferedIterator(); - queueMicrotask(() => { + scheduleTask(() => { transformer._push(`${item}1`); transformer._push(`${item}2`); transformer._push(`${item}3`); @@ -356,7 +356,7 @@ describe('MultiTransformIterator', () => { source = new ArrayIterator(['a', 'b', 'c', 'd', 'e', 'f']); const multiTransform = sinon.spy(item => { const transformer = new BufferedIterator(); - queueMicrotask(() => { + scheduleTask(() => { transformer._push(`${item}1`); transformer._push(`${item}2`); transformer._push(`${item}3`); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index ff0d111..7f6d80b 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -6,10 +6,10 @@ import { EmptyIterator, ArrayIterator, IntegerIterator, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('SimpleTransformIterator', () => { describe('The SimpleTransformIterator function', () => { @@ -163,7 +163,7 @@ describe('SimpleTransformIterator', () => { source = new ArrayIterator(['a', 'b', 'c']); transform = sinon.spy(function (item, done) { this._push(item + (++i)); - queueMicrotask(done); + scheduleTask(done); }); iterator = new SimpleTransformIterator(source, transform); }); diff --git a/test/TaskScheduler-test.js b/test/TaskScheduler-test.js new file mode 100644 index 0000000..34ff495 --- /dev/null +++ b/test/TaskScheduler-test.js @@ -0,0 +1,51 @@ +import { + scheduleTask, + getTaskScheduler, + setTaskScheduler, +} from '../asynciterator.mjs'; + +describe('TaskScheduler', () => { + describe('scheduleTask', () => { + it('is a function', () => { + expect(scheduleTask).to.be.an.instanceof(Function); + }); + + it('schedules a task', done => { + scheduleTask(done); + }); + }); + + describe('getTaskScheduler', () => { + it('is a function', () => { + expect(getTaskScheduler).to.be.an.instanceof(Function); + }); + + it('returns a task scheduler', done => { + const scheduler = getTaskScheduler(); + scheduler(done); + }); + }); + + describe('setTaskScheduler', () => { + it('is a function', () => { + expect(setTaskScheduler).to.be.an.instanceof(Function); + }); + + it('allows setting the task scheduler', () => { + const scheduler = getTaskScheduler(); + expect(getTaskScheduler()).to.equal(scheduler); + + const newScheduler = sinon.spy(); + setTaskScheduler(newScheduler); + expect(getTaskScheduler()).to.equal(newScheduler); + expect(newScheduler).to.not.have.been.called; + + const task = sinon.spy(); + scheduleTask(task); + expect(newScheduler).to.have.been.calledOnce; + expect(newScheduler).to.have.been.calledWith(task); + + setTaskScheduler(scheduler); + }); + }); +}); diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index 9edf660..6561296 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -5,10 +5,10 @@ import { ArrayIterator, TransformIterator, wrap, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('TransformIterator', () => { describe('The TransformIterator function', () => { @@ -476,7 +476,7 @@ describe('TransformIterator', () => { before(() => { iterator = new TransformIterator(source = new ArrayIterator(['a', 'b', 'c'])); iterator._transform = function (item, done) { - queueMicrotask(() => { + scheduleTask(() => { iterator._push(`${item}1`); iterator._push(`${item}2`); done(); @@ -588,7 +588,7 @@ describe('TransformIterator', () => { iterator = new TransformIterator(source); iterator._transform = sinon.spy(function (item, done) { this._push(item + (++i)); - queueMicrotask(done); + scheduleTask(done); }); }); diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 954218a..65ea83a 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -6,10 +6,10 @@ import { EmptyIterator, union, range, + scheduleTask, } from '../asynciterator.mjs'; import { EventEmitter } from 'events'; -import queueMicrotask from 'queue-microtask'; describe('UnionIterator', () => { describe('The UnionIterator function', () => { @@ -96,7 +96,7 @@ describe('UnionIterator', () => { describe('after reading', () => { before(done => { iterator.read(); - queueMicrotask(done); + scheduleTask(done); }); it('should have ended', () => { @@ -168,7 +168,7 @@ describe('UnionIterator', () => { describe('after reading', () => { before(done => { iterator.read(); - queueMicrotask(done); + scheduleTask(done); }); it('should have ended', () => { @@ -405,7 +405,7 @@ describe('UnionIterator', () => { expect(iterator.read()).to.equal(7); // Buffer - await new Promise(resolve => queueMicrotask(resolve)); + await new Promise(resolve => scheduleTask(resolve)); // Read remaining items expect(iterator.read()).to.equal(5); @@ -436,7 +436,7 @@ describe('UnionIterator', () => { const delayed = new AsyncIterator(); const iterator = new UnionIterator(delayed); delayed.readable = true; - queueMicrotask(() => delayed.close()); + scheduleTask(() => delayed.close()); (await toArray(iterator)).should.eql([]); }); });