Skip to content

Commit

Permalink
Merge pull request reduxjs#2074 from reduxjs/feature/listener-abort-task
Browse files Browse the repository at this point in the history
  • Loading branch information
markerikson authored Feb 26, 2022
2 parents 88819e3 + 19b8904 commit effab14
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 12 deletions.
10 changes: 5 additions & 5 deletions packages/toolkit/src/listenerMiddleware/exceptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ const completed = 'completed'
const cancelled = 'cancelled'

/* TaskAbortError error codes */
export const taskCancelled = `${task}-${cancelled}` as const
export const taskCompleted = `${task}-${completed}` as const
export const taskCancelled = `task-${cancelled}` as const
export const taskCompleted = `task-${completed}` as const
export const listenerCancelled = `${listener}-${cancelled}` as const
export const listenerCompleted = `${listener}-${completed}` as const

export class TaskAbortError implements SerializedError {
name = 'TaskAbortError'
message = ''
constructor(public code = 'unknown') {
this.message = `task cancelled (reason: ${code})`
message: string
constructor(public code: string | undefined) {
this.message = `${task} ${cancelled} (reason: ${code})`
}
}
11 changes: 10 additions & 1 deletion packages/toolkit/src/listenerMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import type {
ForkedTask,
TypedRemoveListener,
TaskResult,
AbortSignalWithReason,
} from './types'
import {
abortControllerWithReason,
addAbortSignalListener,
assertFunction,
catchRejection,
} from './utils'
Expand Down Expand Up @@ -74,11 +76,18 @@ const INTERNAL_NIL_TOKEN = {} as const

const alm = 'listenerMiddleware' as const

const createFork = (parentAbortSignal: AbortSignal) => {
const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
const linkControllers = (controller: AbortController) =>
addAbortSignalListener(parentAbortSignal, () =>
abortControllerWithReason(controller, parentAbortSignal.reason)
)

return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()

linkControllers(childAbortController)

const result = runTask<T>(
async (): Promise<T> => {
validateActive(parentAbortSignal)
Expand Down
4 changes: 2 additions & 2 deletions packages/toolkit/src/listenerMiddleware/task.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TaskAbortError } from './exceptions'
import type { AbortSignalWithReason, TaskResult } from './types'
import { catchRejection } from './utils'
import { addAbortSignalListener, catchRejection } from './utils'

/**
* Synchronously raises {@link TaskAbortError} if the task tied to the input `signal` has been cancelled.
Expand Down Expand Up @@ -29,7 +29,7 @@ export const promisifyAbortSignal = (
if (signal.aborted) {
notifyRejection()
} else {
signal.addEventListener('abort', notifyRejection, { once: true })
addAbortSignalListener(signal, notifyRejection)
}
})
)
Expand Down
92 changes: 90 additions & 2 deletions packages/toolkit/src/listenerMiddleware/tests/fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ import type { EnhancedStore } from '@reduxjs/toolkit'
import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'

import type { PayloadAction } from '@reduxjs/toolkit'
import type { ForkedTaskExecutor, TaskResult } from '../types'
import type {
AbortSignalWithReason,
ForkedTaskExecutor,
TaskResult,
} from '../types'
import { createListenerMiddleware, TaskAbortError } from '../index'
import { listenerCancelled, taskCancelled } from '../exceptions'
import {
listenerCancelled,
listenerCompleted,
taskCancelled,
} from '../exceptions'

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand Down Expand Up @@ -312,6 +320,58 @@ describe('fork', () => {
})
})

test('forkApi.delay rejects as soon as the parent listener is cancelled', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
listenerApi.cancelActiveListeners()
await listenerApi.fork(async (forkApi) => {
await forkApi
.delay(100)
.then(deferredResult.resolve, deferredResult.resolve)

return 4
}).result

deferredResult.resolve(new Error('unreachable'))
},
})

store.dispatch(increment())

await Promise.resolve()

store.dispatch(increment())
expect(await deferredResult).toEqual(
new TaskAbortError(listenerCancelled)
)
})

test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
async effect(_, listenerApi) {
const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
async (forkApi) => {
forkApi.signal.addEventListener('abort', () => {
deferredResult.resolve(
(forkApi.signal as AbortSignalWithReason<unknown>).reason
)
})
}
)
},
})

store.dispatch(increment)

expect(await deferredResult).toBe(listenerCompleted)
})

test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
let deferredCompletedEvt = deferred()
let deferredCancelledEvt = deferred()
Expand Down Expand Up @@ -384,6 +444,34 @@ describe('fork', () => {
})
})

test('forkApi.pause rejects as soon as the parent listener is cancelled', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
listenerApi.cancelActiveListeners()
const forkedTask = listenerApi.fork(async (forkApi) => {
await forkApi
.pause(delay(100))
.then(deferredResult.resolve, deferredResult.resolve)

return 4
})

await forkedTask.result
deferredResult.resolve(new Error('unreachable'))
},
})

store.dispatch(increment())

await Promise.resolve()

store.dispatch(increment())
expect(await deferredResult).toEqual(new TaskAbortError(listenerCancelled))
})

test('forkApi.pause rejects if listener is cancelled', async () => {
const incrementByInListener = createAction<number>('incrementByInListener')

Expand Down
4 changes: 2 additions & 2 deletions packages/toolkit/src/listenerMiddleware/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ export type MatchFunction<T> = (v: any) => v is T
export interface ForkedTaskAPI {
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the task has been cancelled or completed.
* rejects if the task or the parent listener has been cancelled or is completed.
*/
pause<W>(waitFor: Promise<W>): Promise<W>
/**
* Returns a promise resolves after `timeoutMs` or
* rejects if the task has been cancelled or is completed.
* rejects if the task or the parent listener has been cancelled or is completed.
* @param timeoutMs
*/
delay(timeoutMs: number): Promise<void>
Expand Down
7 changes: 7 additions & 0 deletions packages/toolkit/src/listenerMiddleware/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ export const catchRejection = <T>(
return promise
}

export const addAbortSignalListener = (
abortSignal: AbortSignal,
callback: (evt: Event) => void
) => {
abortSignal.addEventListener('abort', callback, { once: true })
}

/**
* Calls `abortController.abort(reason)` and patches `signal.reason`.
* if it is not supported.
Expand Down

0 comments on commit effab14

Please sign in to comment.