-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathasynciterator.ts
2377 lines (2131 loc) · 79.8 KB
/
asynciterator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* An asynchronous iterator library for advanced object pipelines
* @module asynciterator
*/
import { EventEmitter } from 'events';
import { LinkedList } from './linkedlist';
import { createTaskScheduler } from './taskscheduler';
import type { Task, TaskScheduler } from './taskscheduler';
let taskScheduler: TaskScheduler = createTaskScheduler();
// Export utilities for reuse
export { LinkedList };
/** Schedules the given task for asynchronous execution. */
export function scheduleTask(task: Task): void {
taskScheduler(task);
}
/** Returns the asynchronous task scheduler. */
export function getTaskScheduler(): TaskScheduler {
return taskScheduler;
}
/** Sets the asynchronous task scheduler. */
export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}
/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
It can already produce items.
@type integer
*/
export const INIT = 1 << 0;
/**
ID of the OPEN state.
An iterator is open if it can generate new items.
@type integer
*/
export const OPEN = 1 << 1;
/**
ID of the CLOSING state.
An iterator is closing if item generation is pending but will not be scheduled again.
@type integer
*/
export const CLOSING = 1 << 2;
/**
ID of the CLOSED state.
An iterator is closed if it no longer actively generates new items.
Items might still be available.
@type integer
*/
export const CLOSED = 1 << 3;
/**
ID of the ENDED state.
An iterator has ended if no further items will become available.
The 'end' event is guaranteed to have been called when in this state.
@type integer
*/
export const ENDED = 1 << 4;
/**
ID of the DESTROYED state.
An iterator has been destroyed
after calling {@link module:asynciterator.AsyncIterator#destroy}.
The 'end' event has not been called, as pending elements were voided.
@type integer
*/
export const DESTROYED = 1 << 5;
/**
An asynchronous iterator provides pull-based access to a stream of objects.
@extends module:asynciterator.EventEmitter
*/
export class AsyncIterator<T> extends EventEmitter implements AsyncIterable<T> {
protected _state: number;
private _readable = false;
protected _properties?: { [name: string]: any };
protected _propertyCallbacks?: { [name: string]: [(value: any) => void] };
/** Creates a new `AsyncIterator`. */
constructor(initialState = OPEN) {
super();
this._state = initialState;
this.on('newListener', waitForDataListener);
}
/**
Changes the iterator to the given state if possible and necessary,
possibly emitting events to signal that change.
@protected
@param {integer} newState The ID of the new state
@param {boolean} [eventAsync=false] Whether resulting events should be emitted asynchronously
@returns {boolean} Whether the state was changed
@emits module:asynciterator.AsyncIterator.end
*/
protected _changeState(newState: number, eventAsync = false) {
// Validate the state change
const valid = newState > this._state && this._state < ENDED;
if (valid) {
this._state = newState;
// Emit the `end` event when changing to ENDED
if (newState === ENDED) {
if (!eventAsync)
this.emit('end');
else
taskScheduler(() => this.emit('end'));
}
}
return valid;
}
/**
Tries to read the next item from the iterator.
This is the main method for reading the iterator in _on-demand mode_,
where new items are only created when needed by consumers.
If no items are currently available, this methods returns `null`.
The {@link module:asynciterator.event:readable} event
will then signal when new items might be ready.
To read all items from the iterator,
switch to _flow mode_ by subscribing
to the {@link module:asynciterator.event:data} event.
When in flow mode, do not use the `read` method.
@returns {object?} The next item, or `null` if none is available
*/
read(): T | null {
return null;
}
/**
The iterator emits a `readable` event when it might have new items available
after having had no items available right before this event.
If the iterator is not in flow mode, items can be retrieved
by calling {@link module:asynciterator.AsyncIterator#read}.
@event module:asynciterator.readable
*/
/**
The iterator emits a `data` event with a new item as soon as it becomes available.
When one or more listeners are attached to the `data` event,
the iterator switches to _flow mode_,
generating and emitting new items as fast as possible.
This drains the source and might create backpressure on the consumers,
so only subscribe to this event if this behavior is intended.
In flow mode, don't use {@link module:asynciterator.AsyncIterator#read}.
To switch back to _on-demand mode_, remove all listeners from the `data` event.
You can then obtain items through `read` again.
@event module:asynciterator.data
@param {object} item The new item
*/
/**
Invokes the callback for each remaining item in the iterator.
Switches the iterator to flow mode.
@param {Function} callback A function that will be called with each item
@param {object?} self The `this` pointer for the callback
*/
forEach(callback: (item: T) => void, self?: object) {
this.on('data', bind(callback, self));
}
/**
Stops the iterator from generating new items.
Already generated items or terminating items can still be emitted.
After this, the iterator will end asynchronously.
@emits module:asynciterator.AsyncIterator.end
*/
close() {
if (this._changeState(CLOSED))
this._endAsync();
}
/**
Destroy the iterator and stop it from generating new items.
This will not do anything if the iterator was already ended or destroyed.
All internal resources will be released an no new items will be emitted,
even not already generated items.
Implementors should not override this method,
but instead implement {@link module:asynciterator.AsyncIterator#_destroy}.
@param {Error} [cause] An optional error to emit.
@emits module:asynciterator.AsyncIterator.end
@emits module:asynciterator.AsyncIterator.error Only if an error is passed.
*/
destroy(cause?: Error) {
if (!this.done) {
this._destroy(cause, error => {
cause = cause || error;
if (cause)
this.emit('error', cause);
this._end(true);
});
}
}
/**
Called by {@link module:asynciterator.AsyncIterator#destroy}.
Implementers can override this, but this should not be called directly.
@param {?Error} cause The reason why the iterator is destroyed.
@param {Function} callback A callback function with an optional error argument.
*/
protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) {
callback();
}
/**
Ends the iterator and cleans up.
Should never be called before {@link module:asynciterator.AsyncIterator#close};
typically, `close` is responsible for calling `_end`.
@param {boolean} [destroy] If the iterator should be forcefully destroyed.
@protected
@emits module:asynciterator.AsyncIterator.end
*/
protected _end(destroy = false) {
if (this._changeState(destroy ? DESTROYED : ENDED)) {
this._readable = false;
this.removeAllListeners('readable');
this.removeAllListeners('data');
this.removeAllListeners('end');
}
}
/**
Asynchronously calls `_end`.
@protected
*/
protected _endAsync() {
taskScheduler(() => this._end());
}
/**
The `end` event is emitted after the last item of the iterator has been read.
@event module:asynciterator.end
*/
/**
Gets or sets whether this iterator might have items available for read.
A value of `false` means there are _definitely_ no items available;
a value of `true` means items _might_ be available.
@type boolean
@emits module:asynciterator.AsyncIterator.readable
*/
get readable() {
return this._readable;
}
set readable(readable) {
readable = Boolean(readable) && !this.done;
// Set the readable value only if it has changed
if (this._readable !== readable) {
this._readable = readable;
// If the iterator became readable, emit the `readable` event
if (readable)
taskScheduler(() => this.emit('readable'));
}
}
/**
Gets whether the iterator has stopped generating new items.
@type boolean
@readonly
*/
get closed() {
return this._state >= CLOSING;
}
/**
Gets whether the iterator has finished emitting items.
@type boolean
@readonly
*/
get ended() {
return this._state === ENDED;
}
/**
Gets whether the iterator has been destroyed.
@type boolean
@readonly
*/
get destroyed() {
return this._state === DESTROYED;
}
/**
Gets whether the iterator will not emit anymore items,
either due to being closed or due to being destroyed.
@type boolean
@readonly
*/
get done() {
return this._state >= ENDED;
}
/* Generates a textual representation of the iterator. */
toString() {
const details = this._toStringDetails();
return `[${this.constructor.name}${details ? ` ${details}` : ''}]`;
}
/**
Generates details for a textual representation of the iterator.
@protected
*/
protected _toStringDetails() {
return '';
}
/**
Consume all remaining items of the iterator into an array that will be returned asynchronously.
@param {object} [options] Settings for array creation
@param {integer} [options.limit] The maximum number of items to place in the array.
*/
toArray(options?: { limit?: number }): Promise<T[]> {
const items: T[] = [];
const limit = typeof options?.limit === 'number' ? options.limit : Infinity;
return this.ended || limit <= 0 ? Promise.resolve(items) : new Promise<T[]>((resolve, reject) => {
// Collect and return all items up to the limit
const resolveItems = () => resolve(items);
const pushItem = (item: T) => {
items.push(item);
if (items.length >= limit) {
this.removeListener('error', reject);
this.removeListener('data', pushItem);
this.removeListener('end', resolveItems);
resolve(items);
}
};
// Start item collection
this.on('error', reject);
this.on('data', pushItem);
this.on('end', resolveItems);
});
}
/**
Retrieves the property with the given name from the iterator.
If no callback is passed, it returns the value of the property
or `undefined` if the property is not set.
If a callback is passed, it returns `undefined`
and calls the callback with the property the moment it is set.
@param {string} propertyName The name of the property to retrieve
@param {Function?} [callback] A one-argument callback to receive the property value
@returns {object?} The value of the property (if set and no callback is given)
*/
getProperty<P>(propertyName: string, callback?: (value: P) => void): P | undefined {
const properties = this._properties;
// If no callback was passed, return the property value
if (!callback)
return properties && properties[propertyName];
// If the value has been set, send it through the callback
if (properties && (propertyName in properties)) {
taskScheduler(() => callback(properties[propertyName]));
}
// If the value was not set, store the callback for when the value will be set
else {
let propertyCallbacks;
if (!(propertyCallbacks = this._propertyCallbacks))
this._propertyCallbacks = propertyCallbacks = Object.create(null);
if (propertyName in propertyCallbacks)
propertyCallbacks[propertyName].push(callback);
else
propertyCallbacks[propertyName] = [callback];
}
return undefined;
}
/**
Sets the property with the given name to the value.
@param {string} propertyName The name of the property to set
@param {object?} value The new value of the property
*/
setProperty<P>(propertyName: string, value: P) {
const properties = this._properties || (this._properties = Object.create(null));
properties[propertyName] = value;
// Execute getter callbacks that were waiting for this property to be set
const propertyCallbacks = this._propertyCallbacks || {};
const callbacks = propertyCallbacks[propertyName];
if (callbacks) {
delete propertyCallbacks[propertyName];
taskScheduler(() => {
for (const callback of callbacks)
callback(value);
});
// Remove _propertyCallbacks if no pending callbacks are left
for (propertyName in propertyCallbacks)
return;
delete this._propertyCallbacks;
}
}
/**
Retrieves all properties of the iterator.
@returns {object} An object with property names as keys.
*/
getProperties() {
const properties = this._properties;
const copy : { [name: string] : any } = {};
for (const name in properties)
copy[name] = properties[name];
return copy;
}
/**
Sets all of the given properties.
@param {object} properties Key/value pairs of properties to set
*/
setProperties(properties: { [name: string] : any }) {
for (const propertyName in properties)
this.setProperty(propertyName, properties[propertyName]);
}
/**
Copies the given properties from the source iterator.
@param {module:asynciterator.AsyncIterator} source The iterator to copy from
@param {Array} propertyNames List of property names to copy
*/
copyProperties(source: AsyncIterator<any>, propertyNames: string[]) {
for (const propertyName of propertyNames) {
source.getProperty(propertyName, value =>
this.setProperty(propertyName, value));
}
}
/**
Transforms items from this iterator.
After this operation, only read the returned iterator instead of the current one.
@param {object|Function} [options] Settings of the iterator, or the transformation function
@param {integer} [options.maxbufferSize=4] The maximum number of items to keep in the buffer
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
@param {integer} [options.offset] The number of items to skip
@param {integer} [options.limit] The maximum number of items
@param {Function} [options.filter] A function to synchronously filter items from the source
@param {Function} [options.map] A function to synchronously transform items from the source
@param {Function} [options.transform] A function to asynchronously transform items from the source
@param {boolean} [options.optional=false] If transforming is optional, the original item is pushed when its mapping yields `null` or its transformation yields no items
@param {Array|module:asynciterator.AsyncIterator} [options.prepend] Items to insert before the source items
@param {Array|module:asynciterator.AsyncIterator} [options.append] Items to insert after the source items
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
transform<D>(options: TransformOptions<T, D>) : AsyncIterator<D> {
return new SimpleTransformIterator<T, D>(this, options);
}
/**
Maps items from this iterator using the given function.
After this operation, only read the returned iterator instead of the current one.
@param {Function} map A mapping function to call on this iterator's (remaining) items
@param {object?} self The `this` pointer for the mapping function
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
return new MappingIterator(this, bind(map, self));
}
/**
Return items from this iterator that match the filter.
After this operation, only read the returned iterator instead of the current one.
@param {Function} filter A filter function to call on this iterator's (remaining) items
@param {object?} self The `this` pointer for the filter function
@returns {module:asynciterator.AsyncIterator} A new iterator that filters items from this iterator
*/
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.map(function (this: any, item: T) {
return filter.call(self || this, item) ? item : null;
});
}
/**
* Returns a new iterator containing all of the unique items in the original iterator.
* @param by - The derived value by which to determine uniqueness (e.g., stringification).
Defaults to the identity function.
* @returns An iterator with duplicates filtered out.
*/
uniq(by: (item: T) => any = identity): AsyncIterator<T> {
const uniques = new Set();
return this.filter(function (this: AsyncIterator<T>, item) {
const hashed = by.call(this, item);
if (!uniques.has(hashed)) {
uniques.add(hashed);
return true;
}
return false;
});
}
/**
Prepends the items after those of the current iterator.
After this operation, only read the returned iterator instead of the current one.
@param {Array|module:asynciterator.AsyncIterator} items Items to insert before this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator
*/
prepend(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ prepend: items });
}
/**
Appends the items after those of the current iterator.
After this operation, only read the returned iterator instead of the current one.
@param {Array|module:asynciterator.AsyncIterator} items Items to insert after this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator
*/
append(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ append: items });
}
/**
Surrounds items of the current iterator with the given items.
After this operation, only read the returned iterator instead of the current one.
@param {Array|module:asynciterator.AsyncIterator} prepend Items to insert before this iterator's (remaining) items
@param {Array|module:asynciterator.AsyncIterator} append Items to insert after this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator
*/
surround(prepend: AsyncIteratorOrArray<T>, append: AsyncIteratorOrArray<T>): AsyncIterator<T> {
return this.transform({ prepend, append });
}
/**
Skips the given number of items from the current iterator.
The current iterator may not be read anymore until the returned iterator ends.
@param {integer} offset The number of items to skip
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.map(item => offset-- > 0 ? null : item);
}
/**
Limits the current iterator to the given number of items.
The current iterator may not be read anymore until the returned iterator ends.
@param {integer} limit The maximum number of items
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
}
/**
Limits the current iterator to the given range.
The current iterator may not be read anymore until the returned iterator ends.
@param {integer} start Index of the first item to return
@param {integer} end Index of the last item to return
@returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range
*/
range(start: number, end: number): AsyncIterator<T> {
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) });
}
/**
Creates a copy of the current iterator,
containing all items emitted from this point onward.
Further copies can be created; they will all start from this same point.
After this operation, only read the returned copies instead of the original iterator.
@returns {module:asynciterator.AsyncIterator} A new iterator that contains all future items of this iterator
*/
clone(): ClonedIterator<T> {
return new ClonedIterator<T>(this);
}
/**
* An AsyncIterator is async iterable.
* This allows iterators to be used via the for-await syntax.
*
* In cases where the returned EcmaScript AsyncIterator will not be fully consumed,
* it is recommended to manually listen for error events on the main AsyncIterator
* to avoid uncaught error messages.
*
* @returns {ESAsyncIterator<T>} An EcmaScript AsyncIterator
*/
[Symbol.asyncIterator](): ESAsyncIterator<T> {
const it = this;
let currentResolve: null | Function = null;
let currentReject: null | Function = null;
let pendingError: null | Error = null;
it.addListener('readable', tryResolve);
it.addListener('end', tryResolve);
it.addListener('error', tryReject);
// Tries to emit an item or signal the end of the iterator
function tryResolve(): void {
if (currentResolve !== null) {
if (pendingError !== null) {
tryReject(pendingError);
}
else if (it.done) {
currentResolve({ done: true, value: undefined });
currentResolve = currentReject = null;
removeListeners();
}
else {
const value = it.read();
if (value !== null) {
currentResolve({ done: false, value });
currentResolve = currentReject = null;
}
}
}
}
// Tries to emit an error
function tryReject(error: Error) {
if (currentReject !== null) {
currentReject(error);
currentResolve = currentReject = pendingError = null;
removeListeners();
}
else if (pendingError === null) {
pendingError = error;
}
}
// Cleans up all attached listeners
function removeListeners() {
it.removeListener('readable', tryResolve);
it.removeListener('end', tryResolve);
it.removeListener('error', tryReject);
}
// An EcmaScript AsyncIterator exposes the next() function that can be invoked repeatedly
return {
next(): Promise<IteratorResult<T>> {
return new Promise<IteratorResult<T>>((resolve, reject) => {
currentResolve = resolve;
currentReject = reject;
tryResolve();
});
},
};
}
}
// Starts emitting `data` events when `data` listeners are added
function waitForDataListener(this: AsyncIterator<any>, eventName: string) {
if (eventName === 'data') {
this.removeListener('newListener', waitForDataListener);
addSingleListener(this, 'readable', emitData);
if (this.readable)
taskScheduler(() => emitData.call(this));
}
}
// Emits new items though `data` events as long as there are `data` listeners
function emitData(this: AsyncIterator<any>) {
// While there are `data` listeners and items, emit them
let item;
while (this.listenerCount('data') !== 0 && (item = this.read()) !== null)
this.emit('data', item);
// Stop draining the source if there are no more `data` listeners
if (this.listenerCount('data') === 0 && !this.done) {
this.removeListener('readable', emitData);
addSingleListener(this, 'newListener', waitForDataListener);
}
}
// Adds the listener to the event, if it has not been added previously.
function addSingleListener(source: EventEmitter, eventName: string,
listener: (...args: any[]) => void) {
if (!source.listeners(eventName).includes(listener))
source.on(eventName, listener);
}
/**
An iterator that doesn't emit any items.
@extends module:asynciterator.AsyncIterator
*/
export class EmptyIterator<T> extends AsyncIterator<T> {
/** Creates a new `EmptyIterator`. */
constructor() {
super();
this._changeState(ENDED, true);
}
}
/**
An iterator that emits a single item.
@extends module:asynciterator.AsyncIterator
*/
export class SingletonIterator<T> extends AsyncIterator<T> {
private _item: T | null;
/**
Creates a new `SingletonIterator`.
@param {object} item The item that will be emitted.
*/
constructor(item: T) {
super();
this._item = item;
if (item === null)
this.close();
else
this.readable = true;
}
/* Reads the item from the iterator. */
read() {
const item = this._item;
this._item = null;
this.close();
return item;
}
/* Generates details for a textual representation of the iterator. */
protected _toStringDetails() {
return this._item === null ? '' : `(${this._item})`;
}
}
/**
An iterator that emits the items of a given array.
@extends module:asynciterator.AsyncIterator
*/
export class ArrayIterator<T> extends AsyncIterator<T> {
private _buffer?: T[];
protected _index: number;
protected _sourceStarted: boolean;
protected _truncateThreshold: number;
/**
Creates a new `ArrayIterator`.
@param {Array} items The items that will be emitted.
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
@param {boolean} [options.preserve=true] If false, the passed array can be safely modified
*/
constructor(items: Iterable<T> = [], { autoStart = true, preserve = true } = {}) {
super();
const buffer = preserve || !Array.isArray(items) ? [...items] : items;
this._index = 0;
this._sourceStarted = autoStart !== false;
this._truncateThreshold = preserve ? -1 : 64;
if (this._sourceStarted && buffer.length === 0)
this.close();
else
this._buffer = buffer;
this.readable = true;
}
/* Reads an item from the iterator. */
read() {
if (!this._sourceStarted)
this._sourceStarted = true;
let item = null;
if (this._buffer) {
// Emit the current item
if (this._index < this._buffer.length)
item = this._buffer[this._index++];
// Close when all elements have been returned
if (this._index === this._buffer.length) {
delete this._buffer;
this.close();
}
// Do need keep old items around indefinitely
else if (this._index === this._truncateThreshold) {
this._buffer.splice(0, this._truncateThreshold);
this._index = 0;
}
}
return item;
}
/* Generates details for a textual representation of the iterator. */
protected _toStringDetails() {
return `(${this._buffer ? this._buffer.length - this._index : 0})`;
}
/* Called by {@link module:asynciterator.AsyncIterator#destroy} */
protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) {
delete this._buffer;
callback();
}
/**
Consume all remaining items of the iterator into an array that will be returned asynchronously.
@param {object} [options] Settings for array creation
@param {integer} [options.limit] The maximum number of items to place in the array.
*/
toArray(options: { limit?: number } = {}): Promise<T[]> {
if (!this._buffer)
return Promise.resolve([]);
// Determine start and end index
const { length } = this._buffer;
const start = this._index;
const end = typeof options.limit !== 'number' ? length : start + options.limit;
// Slice the items off the buffer
const items = this._buffer.slice(start, end);
this._index = end;
// Close this iterator when we're past the end
if (end >= length)
this.close();
return Promise.resolve(items);
}
}
/**
An iterator that enumerates integers in a certain range.
@extends module:asynciterator.AsyncIterator
*/
export class IntegerIterator extends AsyncIterator<number> {
private _next: number;
private _step: number;
private _last: number;
/**
Creates a new `IntegerIterator`.
@param {object} [options] Settings of the iterator
@param {integer} [options.start=0] The first number to emit
@param {integer} [options.end=Infinity] The last number to emit
@param {integer} [options.step=1] The increment between two numbers
*/
constructor({ start = 0, step = 1, end } :
{ start?: number, step?: number, end?: number } = {}) {
super();
// Determine the first number
if (Number.isFinite(start))
start = Math.trunc(start);
this._next = start;
// Determine step size
if (Number.isFinite(step))
step = Math.trunc(step);
this._step = step;
// Determine the last number
const ascending = step >= 0;
const direction = ascending ? Infinity : -Infinity;
if (Number.isFinite(end as number))
end = Math.trunc(end as number);
else if (end !== -direction)
end = direction;
this._last = end;
// Start iteration if there is at least one item; close otherwise
if (!Number.isFinite(start) || (ascending ? start > end : start < end))
this.close();
else
this.readable = true;
}
/* Reads an item from the iterator. */
read() {
if (this.closed)
return null;
const current = this._next, step = this._step, last = this._last,
next = this._next += step;
if (step >= 0 ? next > last : next < last)
this.close();
return current;
}
/* Generates details for a textual representation of the iterator. */
protected _toStringDetails() {
return `(${this._next}...${this._last})`;
}
}
/**
* A synchronous mapping function from one element to another.
* A return value of `null` means that nothing should be emitted for a particular item.
*/
export type MapFunction<S, D = S> = (item: S) => D | null;
/** Function that maps an element to itself. */
export function identity<S>(item: S): typeof item {
return item;
}
/** Key indicating the current consumer of a source. */
export const DESTINATION = Symbol('destination');
/**
An iterator that synchronously transforms every item from its source
by applying a mapping function.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _map: MapFunction<S, D>;
protected readonly _source: InternalSource<S>;
protected readonly _destroySource: boolean;
/**
* Applies the given mapping to the source iterator.
*/
constructor(
source: AsyncIterator<S>,
map: MapFunction<S, D> = identity as MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super();
this._map = map;
this._source = ensureSourceAvailable(source);
this._destroySource = options.destroySource !== false;
// Close if the source is already empty
if (source.done) {
this.close();
}
// Otherwise, wire up the source for reading
else {
this._source[DESTINATION] = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
this.readable = this._source.readable;
}
}
/* Tries to read the next item from the iterator. */
read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
}
}
this.readable = false;
// Close this iterator if the source is empty
if (this._source.done)
this.close();
}
return null;
}
/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source[DESTINATION];
if (this._destroySource)
this._source.destroy();
super._end(destroy);
}
}
// Validates an AsyncIterator for use as a source within another AsyncIterator
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new TypeError(`Invalid source: ${source}`);
if (!allowDestination && (source as any)[DESTINATION])
throw new Error('The source already has a destination');
return source as InternalSource<S>;
}
/**
An iterator that maintains an internal buffer of items.
This class serves as a base class for other iterators
with a typically complex item generation process.
@extends module:asynciterator.AsyncIterator
*/
export class BufferedIterator<T> extends AsyncIterator<T> {
private _buffer: LinkedList<T> = new LinkedList<T>();
private _maxBufferSize = 4;
protected _reading = true;
protected _pushedCount = 0;
protected _sourceStarted: boolean;
/**
Creates a new `BufferedIterator`.
@param {object} [options] Settings of the iterator
@param {integer} [options.maxBufferSize=4] The number of items to preload in the internal buffer
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
*/
constructor({ maxBufferSize = 4, autoStart = true }: BufferedIteratorOptions = {}) {
super(INIT);
this.maxBufferSize = maxBufferSize;
taskScheduler(() => this._init(autoStart));
this._sourceStarted = autoStart !== false;
}
/**
The maximum number of items to preload in the internal buffer.
A `BufferedIterator` tries to fill its buffer as far as possible.
Set to `Infinity` to fully drain the source.
@type number
*/