Skip to content

Commit

Permalink
Merge pull request #65 from cloudnc/feat/async-do-work
Browse files Browse the repository at this point in the history
feat(Worker pool): permit async functions for simpler DoWork api
  • Loading branch information
zakhenry authored Mar 25, 2021
2 parents 9c39fd3 + fd8fe12 commit 1fb4c57
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 27 deletions.
20 changes: 6 additions & 14 deletions .betterer.results
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// BETTERER RESULTS V1.
exports[`stricter compilation`] = {
timestamp: 1603555076420,
timestamp: 1616658221627,
value: `{
"projects/observable-webworker/src/lib/from-worker-pool.spec.ts:2815790124": [
"projects/observable-webworker/src/lib/from-worker-pool.spec.ts:634732869": [
[288, 33, 33, "Argument of type \'(observer: Observer<number>) => () => void\' is not assignable to parameter of type \'(this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic\'.\\n Types of parameters \'observer\' and \'subscriber\' are incompatible.\\n Type \'Subscriber<T>\' is not assignable to type \'Observer<number>\'.\\n Types of property \'next\' are incompatible.\\n Type \'(value?: T | undefined) => void\' is not assignable to type \'(value: number) => void\'.\\n Types of parameters \'value\' and \'value\' are incompatible.\\n Type \'number\' is not assignable to type \'T | undefined\'.", "1166033031"]
],
"projects/observable-webworker/src/lib/from-worker.spec.ts:2047567224": [
Expand All @@ -13,13 +13,7 @@ exports[`stricter compilation`] = {
"projects/observable-webworker/src/lib/from-worker.ts:4009652202": [
[28, 56, 11, "Argument of type \'Input | undefined\' is not assignable to parameter of type \'Input\'.\\n \'Input\' could be instantiated with an arbitrary type which could be unrelated to \'Input | undefined\'.\\n Type \'undefined\' is not assignable to type \'Input\'.\\n \'Input\' could be instantiated with an arbitrary type which could be unrelated to \'undefined\'.", "1574273654"]
],
"projects/observable-webworker/src/lib/run-worker.spec.ts:1321560577": [
[19, 8, 17, "Type \'undefined\' is not assignable to type \'Observable<number>\'.", "4204377774"],
[25, 8, 17, "Type \'undefined\' is not assignable to type \'Observable<number>\'.", "4204377774"],
[38, 8, 17, "Type \'undefined\' is not assignable to type \'Observable<number>\'.", "4204377774"],
[44, 8, 17, "Type \'undefined\' is not assignable to type \'Observable<number>\'.", "4204377774"]
],
"projects/observable-webworker/src/lib/run-worker.ts:1913596105": [
"projects/observable-webworker/src/lib/run-worker.ts:4122498212": [
[4, 65, 7, "Rest parameter \'args\' implicitly has an \'any[]\' type.", "3622679692"],
[51, 38, 26, "Cannot invoke an object which is possibly \'undefined\'.", "4028913799"],
[51, 65, 18, "Argument of type \'O | undefined\' is not assignable to parameter of type \'O\'.\\n \'O\' could be instantiated with an arbitrary type which could be unrelated to \'O | undefined\'.\\n Type \'undefined\' is not assignable to type \'O\'.\\n \'O\' could be instantiated with an arbitrary type which could be unrelated to \'undefined\'.", "3307950093"]
Expand All @@ -36,11 +30,9 @@ exports[`stricter compilation`] = {
[13, 9, 5, "Property \'color\' has no initializer and is not definitely assigned in the constructor.", "176948952"],
[18, 44, 17, "Argument of type \'string | undefined\' is not assignable to parameter of type \'string\'.\\n Type \'undefined\' is not assignable to type \'string\'.", "463397358"]
],
"src/app/multiple-worker-pool/multiple-worker-pool.component.ts:345284253": [
[38, 55, 17, "Property \'timelineComponent\' has no initializer and is not definitely assigned in the constructor.", "964610961"],
[63, 8, 28, "Argument of type \'OperatorFunction<HashWorkerMessage, string | undefined>\' is not assignable to parameter of type \'OperatorFunction<HashWorkerMessage, string>\'.\\n Type \'string | undefined\' is not assignable to type \'string\'.\\n Type \'undefined\' is not assignable to type \'string\'.", "3851287027"],
[87, 9, 14, "Type \'Observable<{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }[]>\' is not assignable to type \'Observable<HashWorkerMessage[]>\'.\\n Type \'{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }[]\' is not assignable to type \'HashWorkerMessage[]\'.\\n Type \'{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }\' is not assignable to type \'HashWorkerMessage\'.\\n Types of property \'millisSinceLast\' are incompatible.\\n Type \'number | null\' is not assignable to type \'number | undefined\'.\\n Type \'null\' is not assignable to type \'number | undefined\'.", "463576723"],
[240, 30, 6, "Parameter \'$event\' implicitly has an \'any\' type.", "3058907597"]
"src/app/multiple-worker-pool/multiple-worker-pool.component.ts:3101768987": [
[40, 55, 17, "Property \'timelineComponent\' has no initializer and is not definitely assigned in the constructor.", "964610961"],
[90, 9, 14, "Type \'Observable<{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }[]>\' is not assignable to type \'Observable<HashWorkerMessage[]>\'.\\n Type \'{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }[]\' is not assignable to type \'HashWorkerMessage[]\'.\\n Type \'{ millisSinceLast: number | null; file?: string | undefined; timestamp: Date; message: string; thread: Thread; fileEventType: FileHashEvent | null; }\' is not assignable to type \'HashWorkerMessage\'.\\n Types of property \'millisSinceLast\' are incompatible.\\n Type \'number | null\' is not assignable to type \'number | undefined\'.\\n Type \'null\' is not assignable to type \'number | undefined\'.", "463576723"]
],
"src/app/single-worker/single-worker.component.ts:228104130": [
[25, 22, 6, "Parameter \'$event\' implicitly has an \'any\' type.", "3058907597"]
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ order with input order
### Example

In this simple example, we have a function that receives an array of files and returns an observable of the MD5 sum
hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to
hashes of those files. For simplicity, we're passing the primitives back and forth, however in reality you are likely to
want to construct your own interface to define the messages being passed to and from the worker.

#### Main Thread
Expand Down Expand Up @@ -247,3 +247,18 @@ Note here that the worker class `implements DoWorkUnit<File, string>`. This is d
If using the `fromWorkerPool` strategy, you must only implement `DoWorkUnit` as it relies on the completion of the
returned observable to indicate that the unit of work is finished processing, and the next unit of work can be
transferred to the worker.

Commonly, a worker that implements `DoWorkUnit` only needs to return a single value, so you may instead return a `Promise`
from the `workUnit` method.

```ts
// src/app/doc/async-work.worker.ts#L7-L14

@ObservableWorker()
export class FactorizationWorker implements DoWorkUnit<number, number[]> {
public async workUnit(input: number): Promise<number[]> {
return factorize(input);
}
}

```
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { fakeAsync, tick } from '@angular/core/testing';
import { Observable, Observer, Subject, Subscription } from 'rxjs';
import { Notification, NotificationKind } from 'rxjs/internal/Notification';
import { Notification } from 'rxjs/internal/Notification';
import { reduce } from 'rxjs/operators';
import { fromWorkerPool } from './from-worker-pool';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Notification, Observable } from 'rxjs';
import { NotificationKind } from 'rxjs/internal/Notification';
import { take } from 'rxjs/operators';
import { ObservableWorker } from './observable-worker.decorator';
import { DoWork, WorkerMessageNotification } from './observable-worker.types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface WorkerMessageNotification<T> extends MessageEvent {
}

export interface DoWorkUnit<I, O> {
workUnit(input: I): Observable<O>;
workUnit(input: I): Observable<O> | PromiseLike<O>;
selectTransferables?(output: O): Transferable[];
}

Expand Down
45 changes: 40 additions & 5 deletions projects/observable-webworker/src/lib/run-worker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { fakeAsync, tick } from '@angular/core/testing';
import { BehaviorSubject, Notification, Observable, of } from 'rxjs';
import { NotificationKind } from 'rxjs/internal/Notification';
import {
DoTransferableWork,
DoTransferableWorkUnit,
Expand All @@ -17,13 +17,13 @@ describe('workerIsTransferableType', () => {
}

public work(input$: Observable<number>): Observable<number> {
return undefined;
return of(1);
}
}

class TestWorkerNotTransferable implements DoWork<number, number> {
public work(input$: Observable<number>): Observable<number> {
return undefined;
return of(1);
}
}

Expand All @@ -36,13 +36,13 @@ describe('workerIsUnitType', () => {
it('should identify a worker as being able to do work units', () => {
class TestWorkerUnit implements DoWorkUnit<number, number> {
public workUnit(input: number): Observable<number> {
return undefined;
return of(1);
}
}

class TestWorkerNotUnit implements DoWork<number, number> {
public work(input$: Observable<number>): Observable<number> {
return undefined;
return of(1);
}
}

Expand Down Expand Up @@ -170,4 +170,39 @@ describe('runWorker', () => {

sub.unsubscribe();
});

it('should permit promises to be returned from doWork to allow for simpler async/await patterns', fakeAsync(() => {
const postMessageSpy = spyOn(window, 'postMessage');

class TestWorkerUnit implements DoWorkUnit<number, number> {
public async workUnit(input: number): Promise<number> {
return input * 2;
}
}

const sub = runWorker(TestWorkerUnit);

const event: WorkerMessageNotification<number> = new MessageEvent('message', {
data: new Notification('N', 1),
});

self.dispatchEvent(event);

tick();

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'N',
value: 2,
}),
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
}));
});
4 changes: 2 additions & 2 deletions projects/observable-webworker/src/lib/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fromEvent, Notification, Observable, Subscription } from 'rxjs';
import { from, fromEvent, Notification, Observable, Subscription } from 'rxjs';
import { concatMap, dematerialize, filter, map, materialize } from 'rxjs/operators';
import { DoTransferableWork, DoWork, DoWorkUnit, WorkerMessageNotification } from './observable-worker.types';

Expand Down Expand Up @@ -33,7 +33,7 @@ export function getWorkerResult<I, O>(
);

return workerIsUnitType(worker)
? input$.pipe(concatMap(input => worker.workUnit(input).pipe(materialize())))
? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize())))
: worker.work(input$).pipe(materialize());
}

Expand Down
12 changes: 12 additions & 0 deletions src/app/doc/async-work.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { DoWorkUnit, ObservableWorker } from 'observable-webworker';

function factorize(input: number): number[] {
return []; // actual implementation left for the reader :)
}

@ObservableWorker()
export class FactorizationWorker implements DoWorkUnit<number, number[]> {
public async workUnit(input: number): Promise<number[]> {
return factorize(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class MultipleWorkerPoolComponent {
),
),
map(message => message.file),
filter((filename): filename is string => !!filename),
scan<string, string[]>((files, file) => [...files, file], []),
startWith([]),
),
Expand Down Expand Up @@ -246,8 +247,8 @@ export class MultipleWorkerPoolComponent {
);
}

public calculateMD5Multiple($event): void {
const files: File[] = Array.from($event.target.files);
public calculateMD5Multiple($event: Event): void {
const files: File[] = Array.from(($event.target as HTMLInputElement).files || []);
this.multiFilesToHash.next(files);
for (const file of files) {
this.eventsPool$.next(this.logMessage(FileHashEvent.SELECTED, 'file selected', file.name));
Expand Down

0 comments on commit 1fb4c57

Please sign in to comment.