From 6bfbf34289a1e8cac42a63552c58325a41d7334a Mon Sep 17 00:00:00 2001 From: Kyle Kirbatski Date: Mon, 22 Apr 2024 11:58:00 -0500 Subject: [PATCH] Add condition, take, fork to forkApi --- .../toolkit/src/listenerMiddleware/index.ts | 123 +++++++++++------- .../toolkit/src/listenerMiddleware/types.ts | 84 ++++++++++-- 2 files changed, 149 insertions(+), 58 deletions(-) diff --git a/packages/toolkit/src/listenerMiddleware/index.ts b/packages/toolkit/src/listenerMiddleware/index.ts index 03d68eba0d..9fcf8dcc1a 100644 --- a/packages/toolkit/src/listenerMiddleware/index.ts +++ b/packages/toolkit/src/listenerMiddleware/index.ts @@ -82,52 +82,6 @@ const INTERNAL_NIL_TOKEN = {} as const const alm = 'listenerMiddleware' as const -const createFork = ( - parentAbortSignal: AbortSignalWithReason, - parentBlockingPromises: Promise[], -) => { - const linkControllers = (controller: AbortController) => - addAbortSignalListener(parentAbortSignal, () => - abortControllerWithReason(controller, parentAbortSignal.reason), - ) - - return ( - taskExecutor: ForkedTaskExecutor, - opts?: ForkOptions, - ): ForkedTask => { - assertFunction(taskExecutor, 'taskExecutor') - const childAbortController = new AbortController() - - linkControllers(childAbortController) - - const result = runTask( - async (): Promise => { - validateActive(parentAbortSignal) - validateActive(childAbortController.signal) - const result = (await taskExecutor({ - pause: createPause(childAbortController.signal), - delay: createDelay(childAbortController.signal), - signal: childAbortController.signal, - })) as T - validateActive(childAbortController.signal) - return result - }, - () => abortControllerWithReason(childAbortController, taskCompleted), - ) - - if (opts?.autoJoin) { - parentBlockingPromises.push(result.catch(noop)) - } - - return { - result: createPause>(parentAbortSignal)(result), - cancel() { - abortControllerWithReason(childAbortController, taskCancelled) - }, - } - } -} - const createTakePattern = ( startListening: AddListenerOverloads, signal: AbortSignal, @@ -192,6 +146,77 @@ const createTakePattern = ( catchRejection(take(predicate, timeout))) as TakePattern } +const createFork = ( + parentAbortSignal: AbortSignalWithReason, + parentBlockingPromises: Promise[], + startListening: AddListenerOverloads, +) => { + const linkControllers = (controller: AbortController) => + addAbortSignalListener(parentAbortSignal, () => + abortControllerWithReason(controller, parentAbortSignal.reason), + ) + + return ( + taskExecutor: ForkedTaskExecutor, + opts?: ForkOptions, + ): ForkedTask => { + assertFunction(taskExecutor, 'taskExecutor') + const childAbortController = new AbortController() + + linkControllers(childAbortController) + + const take = createTakePattern( + startListening as AddListenerOverloads, + childAbortController.signal, + ) + + const autoJoinPromises: Promise[] = [] + + const result = runTask( + async (): Promise => { + try { + validateActive(parentAbortSignal) + validateActive(childAbortController.signal) + const result = (await taskExecutor({ + condition: ( + predicate: AnyListenerPredicate, + timeout?: number, + ) => take(predicate, timeout).then(Boolean), + take, + delay: createDelay(childAbortController.signal), + pause: createPause(childAbortController.signal), + signal: childAbortController.signal, + fork: createFork( + childAbortController.signal, + autoJoinPromises, + startListening, + ), + throwIfCancelled: () => { + validateActive(childAbortController.signal) + }, + })) as T + validateActive(childAbortController.signal) + return result + } finally { + await Promise.all(autoJoinPromises) + } + }, + () => abortControllerWithReason(childAbortController, taskCompleted), + ) + + if (opts?.autoJoin) { + parentBlockingPromises.push(result.catch(noop)) + } + + return { + result: createPause>(parentAbortSignal)(result), + cancel() { + abortControllerWithReason(childAbortController, taskCancelled) + }, + } + } +} + const getListenerEntryPropsFrom = (options: FallbackAddListenerOptions) => { let { type, actionCreator, matcher, predicate, effect } = options @@ -408,7 +433,11 @@ export const createListenerMiddleware = < pause: createPause(internalTaskController.signal), extra, signal: internalTaskController.signal, - fork: createFork(internalTaskController.signal, autoJoinPromises), + fork: createFork( + internalTaskController.signal, + autoJoinPromises, + startListening, + ), unsubscribe: entry.unsubscribe, subscribe: () => { listenerMap.set(entry.id, entry) diff --git a/packages/toolkit/src/listenerMiddleware/types.ts b/packages/toolkit/src/listenerMiddleware/types.ts index 73a7ef1c0e..1c879a7a3b 100644 --- a/packages/toolkit/src/listenerMiddleware/types.ts +++ b/packages/toolkit/src/listenerMiddleware/types.ts @@ -51,12 +51,66 @@ export interface ConditionFunction { export type MatchFunction = (v: any) => v is T /** @public */ -export interface ForkedTaskAPI { +export interface ForkedTaskAPI { + /** + * Returns a promise that resolves when the input predicate returns `true` or + * rejects if the listener has been cancelled or is completed. + * + * The return value is `true` if the predicate succeeds or `false` if a timeout is provided and expires first. + * + * ### Example + * + * ```ts + * const updateBy = createAction('counter/updateBy'); + * + * middleware.startListening({ + * actionCreator: updateBy, + * async effect(_, { condition }) { + * // wait at most 3s for `updateBy` actions. + * if(await condition(updateBy.match, 3_000)) { + * // `updateBy` has been dispatched twice in less than 3s. + * } + * } + * }) + * ``` + */ + condition: ConditionFunction + /** + * Returns a promise that resolves when the input predicate returns `true` or + * rejects if the listener has been cancelled or is completed. + * + * The return value is the `[action, currentState, previousState]` combination that the predicate saw as arguments. + * + * The promise resolves to null if a timeout is provided and expires first, + * + * ### Example + * + * ```ts + * const updateBy = createAction('counter/updateBy'); + * + * middleware.startListening({ + * actionCreator: updateBy, + * async effect(_, { take }) { + * const [{ payload }] = await take(updateBy.match); + * console.log(payload); // logs 5; + * } + * }) + * + * store.dispatch(updateBy(5)); + * ``` + */ + take: TakePattern /** * Returns a promise that resolves when `waitFor` resolves or * rejects if the task or the parent listener has been cancelled or is completed. */ pause(waitFor: Promise): Promise + /** + * An abort signal whose `aborted` property is set to `true` + * if the task execution is either aborted or completed. + * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal: AbortSignal /** * Returns a promise that resolves after `timeoutMs` or * rejects if the task or the parent listener has been cancelled or is completed. @@ -64,25 +118,30 @@ export interface ForkedTaskAPI { */ delay(timeoutMs: number): Promise /** - * An abort signal whose `aborted` property is set to `true` - * if the task execution is either aborted or completed. - * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + * Queues in the next microtask the execution of a task. + * @param executor + * @param options */ - signal: AbortSignal + fork( + executor: ForkedTaskExecutor, + options?: ForkOptions, + ): ForkedTask } /** @public */ -export interface AsyncTaskExecutor { - (forkApi: ForkedTaskAPI): Promise +export interface AsyncTaskExecutor { + (forkApi: ForkedTaskAPI): Promise } /** @public */ -export interface SyncTaskExecutor { - (forkApi: ForkedTaskAPI): T +export interface SyncTaskExecutor { + (forkApi: ForkedTaskAPI): T } /** @public */ -export type ForkedTaskExecutor = AsyncTaskExecutor | SyncTaskExecutor +export type ForkedTaskExecutor = + | AsyncTaskExecutor + | SyncTaskExecutor /** @public */ export type TaskResolved = { @@ -257,7 +316,10 @@ export interface ListenerEffectAPI< * @param executor * @param options */ - fork(executor: ForkedTaskExecutor, options?: ForkOptions): ForkedTask + fork( + executor: ForkedTaskExecutor, + options?: ForkOptions, + ): ForkedTask /** * Returns a promise that resolves when `waitFor` resolves or * rejects if the listener has been cancelled or is completed.