Skip to content

Commit

Permalink
Add MappingIterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeswr authored and RubenVerborgh committed Jul 4, 2022
1 parent d036485 commit 42cae48
Show file tree
Hide file tree
Showing 6 changed files with 1,171 additions and 164 deletions.
128 changes: 114 additions & 14 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
return self ? fn.bind(self) : fn;
}

/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
Expand Down Expand Up @@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the callback
*/
forEach(callback: (item: T) => void, self?: object) {
this.on('data', self ? callback.bind(self) : callback);
this.on('data', bind(callback, self));
}

/**
Expand Down Expand Up @@ -455,8 +460,8 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the mapping function
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
return new MappingIterator(this, bind(map, self));
}

/**
Expand All @@ -469,7 +474,8 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
filter = bind(filter, self);
return this.map(item => filter(item) ? item : null);
}

/**
Expand Down Expand Up @@ -510,7 +516,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return this.map(item => offset-- > 0 ? null : item);
}

/**
Expand Down Expand Up @@ -575,6 +581,14 @@ function addSingleListener(source: EventEmitter, eventName: string,
source.on(eventName, listener);
}

// Validates an AsyncIterator for use as a source within another AsyncIterator
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
}

/**
An iterator that doesn't emit any items.
Expand Down Expand Up @@ -777,9 +791,90 @@ export class IntegerIterator extends AsyncIterator<number> {
}
}

/**
* A synchronous mapping function from one element to another.
* A return value of `null` means that nothing should be emitted for a particular item.
*/
export type MapFunction<S, D = S> = (item: S) => D | null;

/**
* Function that maps an element to itself.
*/
export function identity<S>(item: S): typeof item {
return item;
}

/**
An iterator that synchronously transforms every item from its source
by applying a mapping function.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _map: MapFunction<S, D>;
protected readonly _source: InternalSource<S>;
protected readonly _destroySource: boolean;

/**
* Applies the given mapping to the source iterator.
*/
constructor(
source: AsyncIterator<S>,
map: MapFunction<S, D> = identity as MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super();
this._map = map;
this._source = ensureSourceAvailable(source);
this._destroySource = options.destroySource !== false;

// Close if the source is already empty
if (source.done) {
this.close();
}
// Otherwise, wire up the source for reading
else {
this._source._destination = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
this.readable = this._source.readable;
}
}

/** Tries to read the next item from the iterator. */
read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
}
}
this.readable = false;

// Close this iterator if the source is empty
if (this._source.done)
this.close();
}
return null;
}

/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
if (this._destroySource)
this._source.destroy();
super._end(destroy);
}
}

/**
A iterator that maintains an internal buffer of items.
An iterator that maintains an internal buffer of items.
This class serves as a base class for other iterators
with a typically complex item generation process.
@extends module:asynciterator.AsyncIterator
Expand Down Expand Up @@ -1150,14 +1245,10 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
@param {object} source The source to validate
@param {boolean} allowDestination Whether the source can already have a destination
*/
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false): InternalSource<S> {
if (this._source || typeof this._createSource !== 'undefined')
throw new Error('The source cannot be changed after it has been set');
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
return ensureSourceAvailable(source, allowDestination);
}

/**
Expand Down Expand Up @@ -1240,9 +1331,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}
}

function destinationSetReadable<S>(this: InternalSource<S>) {
this._destination!.readable = true;
}
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination!.emit('error', error);
}
function destinationClose<S>(this: InternalSource<S>) {
this._destination!.close();
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
}
Expand Down Expand Up @@ -1956,15 +2053,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}

export interface SourcedIteratorOptions {
destroySource?: boolean;
}

export interface BufferedIteratorOptions {
maxBufferSize?: number;
autoStart?: boolean;
}

export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
source?: SourceExpression<S>;
optional?: boolean;
destroySource?: boolean;
}

export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"test:microtask": "npm run mocha",
"test:immediate": "npm run mocha -- --require test/config/useSetImmediate.js",
"mocha": "c8 mocha",
"lint": "eslint asynciterator.ts test",
"lint": "eslint asynciterator.ts test perf",
"docs": "npm run build:module && npm run jsdoc",
"jsdoc": "jsdoc -c jsdoc.json"
},
Expand Down
6 changes: 6 additions & 0 deletions perf/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
rules: {
no-console: off,
},
}

61 changes: 61 additions & 0 deletions perf/MappingIterator-perf.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { ArrayIterator, range } from '../dist/asynciterator.js';

function noop() {
// empty function to drain an iterator
}

async function perf(warmupIterator, iterator, description) {
return new Promise(res => {
const now = performance.now();
iterator.on('data', noop);
iterator.on('end', () => {
console.log(description, performance.now() - now);
res();
});
});
}

function run(iterator) {
return new Promise(res => {
iterator.on('data', noop);
iterator.on('end', () => {
res();
});
});
}

function baseIterator() {
let i = 0;
return new ArrayIterator(new Array(20000000).fill(true).map(() => i++));
}

function createMapped(filter) {
let iterator = baseIterator();
for (let j = 0; j < 20; j++) {
iterator = iterator.map(item => item);
if (filter)
iterator = iterator.filter(item => item % (j + 2) === 0);
}
return iterator;
}

(async () => {
await run(baseIterator()); // warm-up run

await perf(baseIterator(), createMapped(), '20000000 elems 20 maps\t\t\t\t\t');
await perf(createMapped(true), createMapped(true), '20000000 elems 20 maps 20 filter\t\t\t');

const now = performance.now();
for (let j = 0; j < 100_000; j++) {
let it = range(1, 100);
for (let k = 0; k < 5; k++)
it = it.map(item => item);

await new Promise((resolve, reject) => {
it.on('data', () => null);
it.on('end', resolve);
it.on('error', reject);
});
}
console.log('100_000 iterators each with 5 maps and 100 elements\t', performance.now() - now);
})();
Loading

0 comments on commit 42cae48

Please sign in to comment.