Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Feat/mapping iterator #75

Merged
merged 1 commit into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 117 additions & 15 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
return self ? fn.bind(self) : fn;
}

/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
Expand Down Expand Up @@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the callback
*/
forEach(callback: (item: T) => void, self?: object) {
this.on('data', self ? callback.bind(self) : callback);
this.on('data', bind(callback, self));
}

/**
Expand Down Expand Up @@ -455,8 +460,8 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the mapping function
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
return new MappingIterator(this, bind(map, self));
}

/**
Expand All @@ -469,7 +474,9 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
return this.map(function (this: any, item: T) {
return filter.call(self || this, item) ? item : null;
});
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeswr Found this nasty bug still when restoring all of the main tests. (The this pointer filter is called with would be wrong because of the nested function.)

}

/**
Expand Down Expand Up @@ -510,7 +517,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return this.map(item => offset-- > 0 ? null : item);
}

/**
Expand Down Expand Up @@ -777,9 +784,99 @@ export class IntegerIterator extends AsyncIterator<number> {
}
}

/**
* A synchronous mapping function from one element to another.
* A return value of `null` means that nothing should be emitted for a particular item.
*/
export type MapFunction<S, D = S> = (item: S) => D | null;

/** Function that maps an element to itself. */
export function identity<S>(item: S): typeof item {
return item;
}


/**
An iterator that synchronously transforms every item from its source
by applying a mapping function.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _map: MapFunction<S, D>;
protected readonly _source: InternalSource<S>;
protected readonly _destroySource: boolean;

/**
* Applies the given mapping to the source iterator.
*/
constructor(
source: AsyncIterator<S>,
map: MapFunction<S, D> = identity as MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super();
this._map = map;
this._source = ensureSourceAvailable(source);
this._destroySource = options.destroySource !== false;

// Close if the source is already empty
if (source.done) {
this.close();
}
// Otherwise, wire up the source for reading
else {
this._source._destination = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
this.readable = this._source.readable;
RubenVerborgh marked this conversation as resolved.
Show resolved Hide resolved
}
}

/* Tries to read the next item from the iterator. */
read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
}
}
this.readable = false;

// Close this iterator if the source is empty
if (this._source.done)
this.close();
}
return null;
}

/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
if (this._destroySource)
this._source.destroy();
RubenVerborgh marked this conversation as resolved.
Show resolved Hide resolved
super._end(destroy);
}
}

// Validates an AsyncIterator for use as a source within another AsyncIterator
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
}


/**
A iterator that maintains an internal buffer of items.
An iterator that maintains an internal buffer of items.
This class serves as a base class for other iterators
with a typically complex item generation process.
@extends module:asynciterator.AsyncIterator
Expand All @@ -797,7 +894,7 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
@param {integer} [options.maxBufferSize=4] The number of items to preload in the internal buffer
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
*/
constructor({ maxBufferSize = 4, autoStart = true } = {}) {
constructor({ maxBufferSize = 4, autoStart = true }: BufferedIteratorOptions = {}) {
super(INIT);
this.maxBufferSize = maxBufferSize;
taskScheduler(() => this._init(autoStart));
Expand Down Expand Up @@ -1150,14 +1247,10 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
@param {object} source The source to validate
@param {boolean} allowDestination Whether the source can already have a destination
*/
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false): InternalSource<S> {
if (this._source || typeof this._createSource !== 'undefined')
throw new Error('The source cannot be changed after it has been set');
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
return ensureSourceAvailable(source, allowDestination);
}

/**
Expand Down Expand Up @@ -1240,9 +1333,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}
}

function destinationSetReadable<S>(this: InternalSource<S>) {
this._destination!.readable = true;
}
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination!.emit('error', error);
}
function destinationClose<S>(this: InternalSource<S>) {
this._destination!.close();
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
}
Expand Down Expand Up @@ -1956,15 +2055,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}

export interface SourcedIteratorOptions {
destroySource?: boolean;
}

export interface BufferedIteratorOptions {
maxBufferSize?: number;
autoStart?: boolean;
}

export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
source?: SourceExpression<S>;
optional?: boolean;
destroySource?: boolean;
}

export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"test:microtask": "npm run mocha",
"test:immediate": "npm run mocha -- --require test/config/useSetImmediate.js",
"mocha": "c8 mocha",
"lint": "eslint asynciterator.ts test",
"lint": "eslint asynciterator.ts test perf",
"docs": "npm run build:module && npm run jsdoc",
"jsdoc": "jsdoc -c jsdoc.json"
},
Expand Down
5 changes: 5 additions & 0 deletions perf/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
rules: {
no-console: off,
},
}
60 changes: 60 additions & 0 deletions perf/MappingIterator-perf.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { ArrayIterator, range } from '../dist/asynciterator.js';

function noop() {
// empty function to drain an iterator
}

async function perf(warmupIterator, iterator, description) {
return new Promise(res => {
const now = performance.now();
iterator.on('data', noop);
iterator.on('end', () => {
console.log(description, performance.now() - now);
res();
});
});
}

function run(iterator) {
return new Promise(res => {
iterator.on('data', noop);
iterator.on('end', () => {
res();
});
});
}

function baseIterator() {
return new ArrayIterator(new Array(20_000_000).fill(true).map((_, i) => i));
}

function createMapped(filter) {
let iterator = baseIterator();
for (let j = 0; j < 20; j++) {
iterator = iterator.map(item => item);
if (filter)
iterator = iterator.filter(item => item % (j + 2) === 0);
}
return iterator;
}

(async () => {
await run(baseIterator()); // warm-up run

await perf(baseIterator(), createMapped(), '20,000,000 elems 20 maps\t\t\t\t\t');
await perf(createMapped(true), createMapped(true), '20,000,000 elems 20 maps 20 filter\t\t\t');

const now = performance.now();
for (let j = 0; j < 100_000; j++) {
let it = range(1, 100);
for (let k = 0; k < 5; k++)
it = it.map(item => item);

await new Promise((resolve, reject) => {
it.on('data', () => null);
it.on('end', resolve);
it.on('error', reject);
});
}
console.log('100,000 iterators each with 5 maps and 100 elements\t', performance.now() - now);
})();
1 change: 0 additions & 1 deletion test/.eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@
callback-return: 0, // For testing incorrect usage
},
}

Loading