-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: Feat/mapping iterator #75
Conversation
Just added a quick performance check we have Main: This PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good stuff, @jeswr. We're absolutely on the right track.
In general, we need to be conservative/defensive when programming derived classes, and not assume too many behaviors of them. I should probably have a list of allowed assumptions, but it's not black and white. Briefly, let's not assume that source
is not one of the implementations we've written, but rather a more unstable one. We have to assume its state and its readable
are correct, but that's about it.
asynciterator.ts
Outdated
if (this._source.done) { | ||
this.close(); | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (this._source.done) { | |
this.close(); | |
return null; | |
} | |
if (!this._source.readable) { | |
this.readable = false; | |
if (this._source.done) | |
this.close(); | |
return null; | |
} |
^ We should be maximally well-behaved and not attempt to call .read
on the source unless it advertises itself as potentially readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RubenVerborgh Is it worth opening an issue to apply this idea to other iterators, for instance in _readAndTransform
and _readAndTransformSimple
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that TransformIterator
and SimpleTransformIterator
also accept regular Node.js Readable
instances, which do not define readable
. So the goal there is to also work with non-AsyncIterator
s (see wrap
).
We can have the same goal for MappingIterator
, but that requires different tests.
Thanks, @jeswr, looking really good. Amazing what two fresh pairs of eyes can do, innit? 😉 |
I think that since this is close, clean it up and then we can release. After that is done we could put most of the logic into an abstract class that I have in mind that it will look something like this (which does have all tests current passing and is over 2x faster on the current performance tests): /**
* A synchronous mapping function from one element to another.
* A return value of `null` means that nothing should be emitted for a particular item.
*/
export type MapFunction<S, D = S> = (item: S) => D | null;
/**
An iterator that calls a synchronous operation
on every item from its source iterator.
@extends module:asynciterator.AsyncIterator
*/
export abstract class SynchronousTransformIterator<S, D = S> extends AsyncIterator<D> {
private readonly _destroySource: boolean;
protected _source: InternalSource<S>;
/**
* Applies the given mapping to the source iterator.
*/
constructor(
_source: AsyncIterator<S>,
options: SourcedIteratorOptions = {}
) {
super();
this._source = _validateSource(_source);
this._destroySource = options.destroySource !== false;
if (_source.done) {
this.close();
}
else {
this._source._destination = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
// If we are given a source that is already readable
// then we need to set the state of this iterable to readable
// also as there is no guarantee that the is no forthcoming
// readable event from the source
this.readable = this._source.readable;
}
}
protected abstract safeRead(): D | null;
read(): D | null {
// Do not read the source if the current iterator is ended
if (this.done)
return null;
// A source should only be read from if readable is true
if (!this._source.readable) {
this.readable = false;
if (this._source.done)
this.close();
return null;
}
const item = this.safeRead();
if (item !== null) {
return item;
}
// This will set readable to false on the current iterator
// if the source is no longer readable
this.readable = false;
return null;
}
/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
if (this._destroySource)
this._source.destroy();
super._end(destroy);
}
}
export class MappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
constructor(
source: AsyncIterator<S>,
private readonly _map: MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super(source, options);
}
safeRead() {
let item: S | null;
while ((item = this._source.read()) !== null) {
const mapped = this._map(item);
if (mapped !== null)
return mapped;
}
return null;
}
map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this._source, [this._map, bind(map, self)], this);
}
}
export class CompositeMappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
constructor(
private root: AsyncIterator<S>,
private mappings: MapFunction<any, any>[] = [],
source: AsyncIterator<any>,
options: SourcedIteratorOptions = {},
) {
super(source, options);
}
safeRead() {
// TODO: See if this is actually necessary
// A source should only be read from if readable is true
if (!this.root.readable) {
this.readable = false;
// TODO: See if this should be here
if (this.root.done)
this.close();
return null;
}
let mapped : any = null;
while (mapped === null && (mapped = this.root.read()) !== null) {
for (let i = 0; i < this.mappings.length; i++) {
mapped = this.mappings[i](mapped);
if (mapped === null)
break;
}
}
return mapped;
}
map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this.root, [...this.mappings, bind(map, self)], this);
}
}
export class UntilIterator<S> extends SynchronousTransformIterator<S> {
constructor(
source: AsyncIterator<S>,
private limit: number,
options: SourcedIteratorOptions = {}
) {
super(source, options);
}
safeRead() {
if (this.limit <= 0) {
this.close();
return null;
}
this.limit -= 1;
return this._source.read();
}
} I'll open up a branch with a cleaned up, and more heavily tested version of the code above (minus the |
f75f86a
to
eeb4796
Compare
@jeswr I have done a review and have arrived at the code you can now see in this PR.
|
|
Ooh interesting - I thought those were failing before without. I think it was historical when @jacoscaz needed a similar pattern to get unit tests passing. If tests are working then I think it is safe to get rid of it provided we have good enough coverage with asynchronous |
Nope - tests still pass making the changes to convert to the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Approved" @RubenVerborgh with one suggested change (I don't have permissions to click the actual review button :))
asynciterator.ts
Outdated
return mapped; | ||
} | ||
} | ||
else if (this._source.done) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else if (this._source.done) { | |
if (this._source.done) { |
Only one suggested change - see #75 (comment).
7b010d5
to
43c502f
Compare
Yeah; and I tend to optimize for fewer
Yes—and that's the benefit of not messing with the base
And fully agreed—the code is even leaner now. I really like what we landed on: read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
}
}
this.readable = false;
// Close this iterator if the source is empty
if (this._source.done)
this.close();
}
return null;
} This has been a common occurrence for me with |
43c502f
to
42cae48
Compare
42cae48
to
def9c09
Compare
return this.transform({ filter: self ? filter.bind(self) : filter }); | ||
return this.map(function (this: any, item: T) { | ||
return filter.call(self || this, item) ? item : null; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeswr Found this nasty bug still when restoring all of the main tests. (The this
pointer filter
is called with would be wrong because of the nested function.)
I had very little to do with this! Great job. I'll rebase #63 ASAP. |
@jacoscaz Reviews and chats are important! See Gitter 😉 |
Nice work! @jeswr Do you have any insights into the performance impact on Comunica? |
A fresh branch, that does not use transduction, as discussed in #59.
I think this resolves all readability problems @RubenVerborgh though I'm hesitant to say that for certain until I look with fresh eyes in the morning.
Getting close (I think) to correct readable behaviour - I just need to get one more set of new tests that I just created working. The currently failing tests test the case where the upstream iterator becomes non-readable without closing.I'll come back to this in a few hrs later this evening :)