Skip to content

Commit

Permalink
Allow customizing the task scheduler.
Browse files Browse the repository at this point in the history
Closes #17
  • Loading branch information
RubenVerborgh committed Jul 4, 2020
1 parent 4e85c70 commit accdde5
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 41 deletions.
41 changes: 30 additions & 11 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@
import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

let taskScheduler: TaskScheduler = queueMicrotask;

/** Schedules the given ask for asynchronous execution. */
export function scheduleTask(task: () => void): void {
taskScheduler(task);
}

/** Returns the asynchronous task scheduler. */
export function getTaskScheduler(): TaskScheduler {
return taskScheduler;
}

/** Sets the asynchronous task scheduler. */
export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
Expand Down Expand Up @@ -90,7 +107,7 @@ export class AsyncIterator<T> extends EventEmitter {
if (!eventAsync)
this.emit('end');
else
queueMicrotask(() => this.emit('end'));
taskScheduler(() => this.emit('end'));
}
}
return valid;
Expand Down Expand Up @@ -210,7 +227,7 @@ export class AsyncIterator<T> extends EventEmitter {
@protected
*/
protected _endAsync() {
queueMicrotask(() => this._end());
taskScheduler(() => this._end());
}

/**
Expand All @@ -236,7 +253,7 @@ export class AsyncIterator<T> extends EventEmitter {
this._readable = readable;
// If the iterator became readable, emit the `readable` event
if (readable)
queueMicrotask(() => this.emit('readable'));
taskScheduler(() => this.emit('readable'));
}
}

Expand Down Expand Up @@ -308,7 +325,7 @@ export class AsyncIterator<T> extends EventEmitter {
return properties && properties[propertyName];
// If the value has been set, send it through the callback
if (properties && (propertyName in properties)) {
queueMicrotask(() => callback(properties[propertyName]));
taskScheduler(() => callback(properties[propertyName]));
}
// If the value was not set, store the callback for when the value will be set
else {
Expand Down Expand Up @@ -336,7 +353,7 @@ export class AsyncIterator<T> extends EventEmitter {
const callbacks = propertyCallbacks[propertyName];
if (callbacks) {
delete propertyCallbacks[propertyName];
queueMicrotask(() => {
taskScheduler(() => {
for (const callback of callbacks)
callback(value);
});
Expand Down Expand Up @@ -502,7 +519,7 @@ function waitForDataListener(this: AsyncIterator<any>, eventName: string) {
this.removeListener('newListener', waitForDataListener);
addSingleListener(this, 'readable', emitData);
if (this.readable)
queueMicrotask(() => emitData.call(this));
taskScheduler(() => emitData.call(this));
}
}
// Emits new items though `data` events as long as there are `data` listeners
Expand Down Expand Up @@ -710,7 +727,7 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
constructor({ maxBufferSize = 4, autoStart = true } = {}) {
super(INIT);
this.maxBufferSize = maxBufferSize;
queueMicrotask(() => this._init(autoStart));
taskScheduler(() => this._init(autoStart));
}

/**
Expand Down Expand Up @@ -886,7 +903,7 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
// Acquire reading lock to avoid recursive reads
if (!this._reading) {
this._reading = true;
queueMicrotask(() => {
taskScheduler(() => {
// Release reading lock so _fillBuffer` can take it
this._reading = false;
this._fillBuffer();
Expand Down Expand Up @@ -1072,7 +1089,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
const next = () => {
// Continue transforming until at least `count` items have been pushed
if (this._pushedCount < count && !this.closed)
queueMicrotask(() => this._readAndTransform(next, done));
taskScheduler(() => this._readAndTransform(next, done));
else
done();
};
Expand Down Expand Up @@ -1220,10 +1237,10 @@ export class SimpleTransformIterator<S, D = S> extends TransformIterator<S, D> {
/* Tries to read and transform items */
protected _read(count: number, done: () => void) {
const next = () => this._readAndTransformSimple(count, nextAsync, done);
this._readAndTransformSimple(count, nextAsync, done);
function nextAsync() {
queueMicrotask(next);
taskScheduler(next);
}
this._readAndTransformSimple(count, nextAsync, done);
}

/* Reads and transform items */
Expand Down Expand Up @@ -1858,3 +1875,5 @@ type SourceExpression<T> =

type InternalSource<T> =
AsyncIterator<T> & { _destination: AsyncIterator<any> };

type TaskScheduler = (task: () => void) => void;
4 changes: 2 additions & 2 deletions test/AsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import {
CLOSED,
ENDED,
DESTROYED,
scheduleTask,
} from '../asynciterator.mjs';

import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

describe('AsyncIterator', () => {
describe('The AsyncIterator module', () => {
Expand Down Expand Up @@ -408,7 +408,7 @@ describe('AsyncIterator', () => {
before(() => {
iterator = new AsyncIterator();
captureEvents(iterator, 'data', 'readable', 'end');
iterator._destroy = (error, callback) => queueMicrotask(callback);
iterator._destroy = (error, callback) => scheduleTask(callback);
iterator.destroy();
});

Expand Down
20 changes: 9 additions & 11 deletions test/BufferedIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import {
AsyncIterator,
BufferedIterator,
CLOSED,
scheduleTask,
} from '../asynciterator.mjs';

import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

describe('BufferedIterator', () => {
describe('The BufferedIterator function', () => {
Expand Down Expand Up @@ -301,7 +301,7 @@ describe('BufferedIterator', () => {
function createIterator(options) {
const iterator = new BufferedIterator(options);
iterator._read = function (count, done) {
queueMicrotask(() => {
scheduleTask(() => {
this.close();
done();
});
Expand Down Expand Up @@ -660,8 +660,8 @@ describe('BufferedIterator', () => {
describe('after `read` is called and the iterator has been closed', () => {
before(() => {
iterator.read();
queueMicrotask(() => { _readDone(); });
queueMicrotask(() => { iterator.close(); });
scheduleTask(() => { _readDone(); });
scheduleTask(() => { iterator.close(); });
});

it('should have emitted the `end` event', () => {
Expand Down Expand Up @@ -1292,7 +1292,7 @@ describe('BufferedIterator', () => {
const iterator = new BufferedIterator(options);
iterator._read = function (count, done) {
this._push('a');
queueMicrotask(() => {
scheduleTask(() => {
this._push('b');
this._push('c');
done();
Expand Down Expand Up @@ -1408,9 +1408,7 @@ describe('BufferedIterator', () => {
before(done => {
iterator = new BufferedIterator({ autoStart: false });
iterator._read = (count, callback) => { readDone = callback; };
// `queueMicrotask` because reading directly after construction does not call `_read`;
// this is necessary to enable attaching a `_begin` hook after construction
queueMicrotask(() => { iterator.read(); done(); });
scheduleTask(() => { iterator.read(); done(); });
});

it('should cause an exception', () => {
Expand Down Expand Up @@ -1637,7 +1635,7 @@ describe('BufferedIterator', () => {
before(() => {
iterator = new BufferedIterator();
iterator._begin = function (done) {
queueMicrotask(() => {
scheduleTask(() => {
this._push('x');
this._push('y');
done();
Expand Down Expand Up @@ -1947,7 +1945,7 @@ describe('BufferedIterator', () => {
done();
};
iterator._flush = function (done) {
queueMicrotask(() => {
scheduleTask(() => {
this._push('x');
this._push('y');
done();
Expand Down Expand Up @@ -2206,7 +2204,7 @@ describe('BufferedIterator', () => {
done();
};
iterator._flush = function (done) {
queueMicrotask(() => {
scheduleTask(() => {
this._push('x');
this._push('y');
done();
Expand Down
14 changes: 7 additions & 7 deletions test/MultiTransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import {
EmptyIterator,
SingletonIterator,
ArrayIterator,
scheduleTask,
} from '../asynciterator.mjs';

import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

describe('MultiTransformIterator', () => {
describe('The MultiTransformIterator function', () => {
Expand Down Expand Up @@ -146,7 +146,7 @@ describe('MultiTransformIterator', () => {
iterator = new MultiTransformIterator(source);
iterator._createTransformer = sinon.spy(() => {
const transformer = new BufferedIterator();
setImmediate(() => transformer.close());
setTimeout(() => transformer.close(), 0);
return transformer;
});
});
Expand Down Expand Up @@ -181,7 +181,7 @@ describe('MultiTransformIterator', () => {
iterator = new MultiTransformIterator(source);
iterator._createTransformer = sinon.spy(item => {
const transformer = new BufferedIterator();
queueMicrotask(() => {
scheduleTask(() => {
transformer._push(`${item}1`);
transformer.close();
});
Expand Down Expand Up @@ -209,7 +209,7 @@ describe('MultiTransformIterator', () => {
iterator = new MultiTransformIterator(source);
iterator._createTransformer = sinon.spy(item => {
const transformer = new BufferedIterator();
queueMicrotask(() => {
scheduleTask(() => {
transformer._push(`${item}1`);
transformer._push(`${item}2`);
transformer._push(`${item}3`);
Expand Down Expand Up @@ -300,7 +300,7 @@ describe('MultiTransformIterator', () => {
iterator = new MultiTransformIterator(source);
iterator._createTransformer = sinon.spy(item => {
const transformer = new BufferedIterator();
queueMicrotask(() => {
scheduleTask(() => {
transformer.emit('error', new Error(`Error ${item}`));
});
return transformer;
Expand All @@ -319,7 +319,7 @@ describe('MultiTransformIterator', () => {
source = new ArrayIterator(['a', 'b', 'c', 'd', 'e', 'f']);
const multiTransform = sinon.spy(item => {
const transformer = new BufferedIterator();
queueMicrotask(() => {
scheduleTask(() => {
transformer._push(`${item}1`);
transformer._push(`${item}2`);
transformer._push(`${item}3`);
Expand Down Expand Up @@ -356,7 +356,7 @@ describe('MultiTransformIterator', () => {
source = new ArrayIterator(['a', 'b', 'c', 'd', 'e', 'f']);
const multiTransform = sinon.spy(item => {
const transformer = new BufferedIterator();
queueMicrotask(() => {
scheduleTask(() => {
transformer._push(`${item}1`);
transformer._push(`${item}2`);
transformer._push(`${item}3`);
Expand Down
4 changes: 2 additions & 2 deletions test/SimpleTransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import {
EmptyIterator,
ArrayIterator,
IntegerIterator,
scheduleTask,
} from '../asynciterator.mjs';

import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

describe('SimpleTransformIterator', () => {
describe('The SimpleTransformIterator function', () => {
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('SimpleTransformIterator', () => {
source = new ArrayIterator(['a', 'b', 'c']);
transform = sinon.spy(function (item, done) {
this._push(item + (++i));
queueMicrotask(done);
scheduleTask(done);
});
iterator = new SimpleTransformIterator(source, transform);
});
Expand Down
51 changes: 51 additions & 0 deletions test/TaskScheduler-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import {
scheduleTask,
getTaskScheduler,
setTaskScheduler,
} from '../asynciterator.mjs';

describe('TaskScheduler', () => {
describe('scheduleTask', () => {
it('is a function', () => {
expect(scheduleTask).to.be.an.instanceof(Function);
});

it('schedules a task', done => {
scheduleTask(done);
});
});

describe('getTaskScheduler', () => {
it('is a function', () => {
expect(getTaskScheduler).to.be.an.instanceof(Function);
});

it('returns a task scheduler', done => {
const scheduler = getTaskScheduler();
scheduler(done);
});
});

describe('setTaskScheduler', () => {
it('is a function', () => {
expect(setTaskScheduler).to.be.an.instanceof(Function);
});

it('allows setting the task scheduler', () => {
const scheduler = getTaskScheduler();
expect(getTaskScheduler()).to.equal(scheduler);

const newScheduler = sinon.spy();
setTaskScheduler(newScheduler);
expect(getTaskScheduler()).to.equal(newScheduler);
expect(newScheduler).to.not.have.been.called;

const task = sinon.spy();
scheduleTask(task);
expect(newScheduler).to.have.been.calledOnce;
expect(newScheduler).to.have.been.calledWith(task);

setTaskScheduler(scheduler);
});
});
});
6 changes: 3 additions & 3 deletions test/TransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import {
ArrayIterator,
TransformIterator,
wrap,
scheduleTask,
} from '../asynciterator.mjs';

import { EventEmitter } from 'events';
import queueMicrotask from 'queue-microtask';

describe('TransformIterator', () => {
describe('The TransformIterator function', () => {
Expand Down Expand Up @@ -476,7 +476,7 @@ describe('TransformIterator', () => {
before(() => {
iterator = new TransformIterator(source = new ArrayIterator(['a', 'b', 'c']));
iterator._transform = function (item, done) {
queueMicrotask(() => {
scheduleTask(() => {
iterator._push(`${item}1`);
iterator._push(`${item}2`);
done();
Expand Down Expand Up @@ -588,7 +588,7 @@ describe('TransformIterator', () => {
iterator = new TransformIterator(source);
iterator._transform = sinon.spy(function (item, done) {
this._push(item + (++i));
queueMicrotask(done);
scheduleTask(done);
});
});

Expand Down
Loading

0 comments on commit accdde5

Please sign in to comment.