Skip to content

Commit

Permalink
WIP: Review.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Jun 24, 2022
1 parent ba64a35 commit e8b1a7c
Show file tree
Hide file tree
Showing 8 changed files with 743 additions and 754 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
if: ${{ matrix.node-version != '10.x' }}
- run: npx c8 --reporter=lcov mocha
- uses: coverallsapp/github-action@master
if: ${{ matrix.node-version != '10.x' }}
with:
github-token: ${{ secrets.github_token }}
flag-name: run-${{ matrix.node-version }}
Expand Down
219 changes: 127 additions & 92 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
}

/** Binds a function to an object */
function bind(fn: Function, self: any) {
function bind(fn: Function, self?: object) {
return self ? fn.bind(self) : fn;
}

Expand Down Expand Up @@ -457,13 +457,12 @@ export class AsyncIterator<T> extends EventEmitter {
Maps items from this iterator using the given function.
After this operation, only read the returned iterator instead of the current one.
@param {Function} map A mapping function to call on this iterator's (remaining) items.
A `null` value indicates that nothing should be returned for a particular item..
A `null` value indicates that nothing should be returned for a particular item.
@param {object?} self The `this` pointer for the mapping function
@param {boolean?} close Close the iterator after an item is mapped to null
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T, it: AsyncIterator<any>) => D | null, self?: any): AsyncIterator<D> {
return new MappingIterator<T, D>(this, [bind(map, self)]);
return new MappingIterator<T, D>(this, bind(map, self));
}

/**
Expand Down Expand Up @@ -785,9 +784,121 @@ export class IntegerIterator extends AsyncIterator<number> {
}
}

/**
* A synchronous mapping function from one element to another.
* The iterator performing the mapping is passed as a second argument.
*/
export type MapFunction<S, D = S, I extends AsyncIterator<D> = AsyncIterator<D>> =
(item: S, iterator: I) => D | null;

/**
An iterator that calls a synchronous mapping function
on every item from its source iterator.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected _source: AsyncIterator<S>;
private readonly _destroySource: boolean;
private readonly _mappings: MapFunction<any, any, MappingIterator<S, D>>[];
private readonly _mappingRoot: InternalSource<any>;

// This is wrong: readable should be set by listening to source events
get readable() {
return (this._state < CLOSED) && this._source.readable;
}

/**
* Applies the given mapping to the source iterator.
*/
constructor(
source: AsyncIterator<S>,
mapping?: MapFunction<S, D, MappingIterator<S, D>>,
options?: SourcedIteratorOptions,
);

/**
* Applies the given list of mappings to the mapping root.
*
* This is an optimization for
* root.map(f1).map(f2).map(f3)
* where the combined mapping x => f3(f2(f1(x)))
* is applied to root rather than to the intermediate sources.
*/
constructor(
source: AsyncIterator<S>,
mappings: MapFunction<any, any, MappingIterator<S, D>>[],
mappingRoot: AsyncIterator<any>,
options?: SourcedIteratorOptions,
);

constructor(
source: AsyncIterator<S>,
mappings: MapFunction<S, D, MappingIterator<S, D>> |
MapFunction<any, any, MappingIterator<S, D>>[] = [],
mappingRoot?: AsyncIterator<any> | SourcedIteratorOptions,
options: SourcedIteratorOptions = {},
) {
super();
// Resolve optional parameters
if (!isEventEmitter(mappingRoot)) {
if (mappingRoot)
options = mappingRoot;
mappingRoot = source;
}
this._source = source;
this._mappings = isFunction(mappings) ? [mappings] : mappings;
this._mappingRoot = mappingRoot as InternalSource<any>;
this._destroySource = options.destroySource !== false;

if (mappingRoot.done) {
this.close();
}
else {
_validateSource(mappingRoot);
this._mappingRoot._destination = this;
this._mappingRoot.on('end', destinationClose);
this._mappingRoot.on('error', destinationEmitError);
this._mappingRoot.on('readable', destinationEmitReadable);
}
}

read(): D | null {
let mapped : any = null;
while (mapped === null && (mapped = this._source.read()) !== null) {
for (let i = 0; i < this._mappings.length; i++) {
mapped = this._mappings[i](mapped, this);
if (mapped === null)
break;
}
}
return mapped;
}

map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
return new MappingIterator<S, K>(this._source, [...this._mappings, bind(map, self)], this);
}

public close() {
if (this._destroySource)
this._mappingRoot.destroy();
super.close();
}

/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._mappingRoot.removeListener('end', destinationClose);
this._mappingRoot.removeListener('error', destinationEmitError);
this._mappingRoot.removeListener('readable', destinationEmitReadable);
delete this._mappingRoot._destination;
if (this._destroySource)
this._mappingRoot.destroy();
super._end(destroy);
}
}


/**
A iterator that maintains an internal buffer of items.
An iterator that maintains an internal buffer of items.
This class serves as a base class for other iterators
with a typically complex item generation process.
@extends module:asynciterator.AsyncIterator
Expand Down Expand Up @@ -1252,9 +1363,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}
}

function destinationEmitReadable<S>(this: InternalSource<S>) {
this._destination!.emit('readable');
}
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination!.emit('error', error);
}
function destinationClose<S>(this: InternalSource<S>) {
this._destination!.close();
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
}
Expand All @@ -1263,91 +1380,6 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
(this._destination as any)._fillBuffer();
}

export class MappingIterator<T, D = T> extends AsyncIterator<D> {
private _destroySource: boolean;

get readable() {
return this.source.readable;
}

set readable(readable) {
this.source.readable = readable;
}

constructor(
protected source: AsyncIterator<T>,
private transforms: ((item: any, iterator: AsyncIterator<any>) => any)[],
private upstream: AsyncIterator<any> = source,
options: { destroySource?: boolean } = {}
) {
// Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing
// listeners to the original source
super();
this._destroySource = options.destroySource !== false;
if (upstream.done) {
this.close();
}
else {
_validateSource(upstream);
// @ts-ignore
upstream._destination = this;
upstream.on('end', onSourceEnd);
upstream.on('error', onSourceError);
upstream.on('readable', onSourceReadable);
}
}

read(): D | null {
const { source, transforms } = this;
let item, i;
while ((item = source.read()) !== null) {
i = transforms.length;
// Applies each of the transforms in sequence, and terminates
// early if a transform returns null
//
// Do not use a for-of loop here, it slows down transformations
// by approximately a factor of 2.
while (i-- >= 1 && (item = transforms[i](item, this)) !== null)
;
if (item !== null)
return item;
}
return null;
}

map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
return new MappingIterator<T, K>(this.source, [bind(map, self), ...this.transforms], this);
}

destroy(cause?: Error): void {
this.upstream.destroy(cause);
super.destroy(cause);
}

public close() {
this.upstream.removeListener('end', onSourceEnd);
this.upstream.removeListener('error', onSourceError);
this.upstream.removeListener('readable', onSourceReadable);
if (this._destroySource)
this.upstream.destroy();
scheduleTask(() => {
// @ts-ignore
delete this.upstream._destination;
delete this.source;
});
super.close();
}
}

function onSourceError<S>(this: InternalSource<S>, error: Error) {
this._destination.emit('error', error);
}
function onSourceEnd<S>(this: InternalSource<S>) {
this._destination.close();
}
function onSourceReadable<S>(this: InternalSource<S>) {
this._destination.emit('readable');
}

/**
An iterator that generates items based on a source iterator
Expand Down Expand Up @@ -2043,15 +2075,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}

export interface SourcedIteratorOptions {
destroySource?: boolean;
}

export interface BufferedIteratorOptions {
maxBufferSize?: number;
autoStart?: boolean;
}

export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
source?: SourceExpression<S>;
optional?: boolean;
destroySource?: boolean;
}

export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"scripts": {
"build": "npm run build:clean && npm run build:module && npm run build:commonjs && npm run build:types",
"build:clean": "rm -rf dist",
"build:module": " tsc --module es2015 && mv dist/ts-out/*.js dist && npm run build:module:import",
"build:module:import": " sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
"build:module": "tsc && mv dist/ts-out/*.js dist && npm run build:module:import",
"build:module:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
"build:commonjs": "tsc --module commonjs && ./.change-extension cjs dist/ts-out/*.js && mv dist/ts-out/*.cjs dist && npm run build:commonjs:import",
"build:commonjs:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.cjs/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.cjs/' dist/asynciterator.cjs && rm dist/*.bak",
"build:types": "tsc -d && rm dist/ts-out/*.js && mv dist/ts-out/*.d.ts dist",
Expand Down
25 changes: 0 additions & 25 deletions test.js

This file was deleted.

Loading

0 comments on commit e8b1a7c

Please sign in to comment.