Skip to content

Commit

Permalink
feat: implement saga-style effects using abortController
Browse files Browse the repository at this point in the history
  • Loading branch information
FaberVitale committed Dec 11, 2021
1 parent 952a632 commit 81cf582
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 456 deletions.
8 changes: 8 additions & 0 deletions packages/action-listener-middleware/src/exceptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export class TaskAbortError implements Error {
name: string
message: string
constructor(public reason?: string) {
this.name = 'TaskAbortError'
this.message = `task cancelled` + (reason != null ? `: "${reason}"` : '')
}
}
250 changes: 152 additions & 98 deletions packages/action-listener-middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
AnyAction,
Action,
ThunkDispatch,
MiddlewareAPI,
} from '@reduxjs/toolkit'
import { createAction, nanoid } from '@reduxjs/toolkit'

Expand All @@ -28,17 +29,13 @@ import type {
WithMiddlewareType,
TakePattern,
ListenerErrorInfo,
TaskExecutor,
ForkedTask,
} from './types'

import {
Job,
SupervisorJob,
JobHandle,
JobCancellationReason,
JobCancellationException,
} from './job'
import { TaskAbortError } from './exceptions'
import { Outcome } from './outcome'

export { TaskAbortError } from './exceptions'
export type {
ActionListener,
ActionListenerMiddleware,
Expand All @@ -51,6 +48,10 @@ export type {
TypedAddListener,
TypedAddListenerAction,
Unsubscribe,
TaskExecutor,
ForkedTask,
AsyncTaskExecutor,
SyncTaskExecutor,
} from './types'

function assertFunction(
Expand All @@ -65,73 +66,109 @@ function assertFunction(
const defaultWhen: MiddlewarePhase = 'afterReducer'
const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const

function assertActive(signal: AbortSignal, reason?: string) {
if (signal.aborted) {
throw new TaskAbortError(reason)
}
}

function createDelay(signal: AbortSignal) {
return function delay(timeoutMs: number): Promise<void> {
return new Promise((resolve, reject) => {
assertActive(signal)

setTimeout(() => {
try {
assertActive(signal)
} catch (err) {
reject(err)
}
resolve()
}, timeoutMs)
})
}
}

function createFork(parentAbortSignal: AbortSignal) {
return function fork<T>(childJobExecutor: TaskExecutor<T>): ForkedTask<T> {
const childAbortController = new AbortController()
const promise = Outcome.wrap(
new Promise<T>((resolve, reject) => {
assertActive(parentAbortSignal)
Promise.resolve(childJobExecutor()).then((val) => {
assertActive(parentAbortSignal)
assertActive(childAbortController.signal)
resolve(val)
}, reject)
})
)

return {
promise,
controller: childAbortController,
}
}
}

function createTakePattern<S>(
addListener: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
parentJob: Job<any>
signal: AbortSignal
): TakePattern<S> {
/**
* A function that takes an ActionListenerPredicate and an optional timeout,
* and resolves when either the predicate returns `true` based on an action
* state combination or when the timeout expires.
* If the parent listener is canceled while waiting, this will throw a
* JobCancellationException.
* TaskAbortError.
*/
async function take<P extends AnyActionListenerPredicate<S>>(
predicate: P,
timeout: number | undefined
) {
assertActive(signal)

// Placeholder unsubscribe function until the listener is added
let unsubscribe: Unsubscribe = () => {}

// We'll add an additional nested Job representing this function.
// TODO This is really a duplicate of the other job inside the middleware.
// This behavior requires some additional nesting:
// We're going to create a `Promise` representing the result of the listener,
// but then wrap that in an `Outcome` for consistent error handling.
let job: Job<[AnyAction, S, S]> = parentJob.launch(async (job) =>
Outcome.wrap(
new Promise<[AnyAction, S, S]>((resolve) => {
// Inside the Promise, we synchronously add the listener.
unsubscribe = addListener({
predicate: predicate as any,
listener: (action, listenerApi): void => {
// One-shot listener that cleans up as soon as the predicate passes
listenerApi.unsubscribe()
// Resolve the promise with the same arguments the predicate saw
resolve([
action,
listenerApi.getState(),
listenerApi.getOriginalState(),
])
},
parentJob,
})
})
const signalPromise = new Promise<null>((_, reject) => {
signal.addEventListener('abort', () => {
reject(new TaskAbortError())
}, { once: true });
});

const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
// Inside the Promise, we synchronously add the listener.
unsubscribe = addListener({
predicate: predicate as any,
listener: (action, listenerApi): void => {
// One-shot listener that cleans up as soon as the predicate passes
listenerApi.unsubscribe()
// Resolve the promise with the same arguments the predicate saw
resolve([
action,
listenerApi.getState(),
listenerApi.getOriginalState(),
])
},
})
})

const promises: (Promise<null> | Promise<[AnyAction, S, S]>)[] = [
signalPromise,
tuplePromise,
]

if (timeout != null) {
promises.push(
new Promise<null>((resolve) => setTimeout(resolve, timeout, null))
)
)

let result: Outcome<[AnyAction, S, S]>
}

try {
// Run the job and use the timeout if given
result = await (timeout !== undefined
? job.runWithTimeout(timeout)
: job.run())

if (result.isOk()) {
// Resolve the actual `take` promise with the action+states
return result.value
} else {
if (
result.error instanceof JobCancellationException &&
result.error.reason === JobCancellationReason.JobCancelled
) {
// The `take` job itself was canceled due to timeout.
return null
}
// The parent was canceled - reject this promise with that error
throw result.error
}
const output = await Promise.race(promises)

assertActive(signal)
return output
} finally {
// Always clean up the listener
unsubscribe()
Expand Down Expand Up @@ -171,10 +208,10 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
listener: options.listener,
type,
predicate,
taskAbortControllerSet: new Set<AbortController>(),
unsubscribe: () => {
throw new Error('Unsubscribe not initialized')
},
parentJob: new SupervisorJob(),
}

return entry
Expand Down Expand Up @@ -334,6 +371,63 @@ export function createActionListenerMiddleware<
return true
}

async function notifyListener(
entry: ListenerEntry<unknown, Dispatch<AnyAction>>,
action: AnyAction,
api: MiddlewareAPI,
getOriginalState: () => S,
currentPhase: MiddlewarePhase
) {
const internalTaskController = new AbortController()
const take = createTakePattern(addListener, internalTaskController.signal)
const condition: ConditionFunction<S> = (predicate, timeout) => {
return take(predicate, timeout).then(Boolean)
}
const delay = createDelay(internalTaskController.signal)
const fork = createFork(internalTaskController.signal)

try {
entry.taskAbortControllerSet.add(internalTaskController)
await Promise.resolve(
entry.listener(action, {
...api,
getOriginalState,
condition,
take,
delay,
currentPhase,
extra,
signal: internalTaskController.signal,
fork,
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
},
cancelPrevious: () => {
entry.taskAbortControllerSet.forEach((controller, _, set) => {
if (
controller !== internalTaskController &&
!controller.signal.aborted
) {
controller.abort()
}
})
},
})
)
} catch (listenerError) {
if (!(listenerError instanceof TaskAbortError)) {
safelyNotifyError(onError, listenerError, {
raisedBy: 'listener',
phase: currentPhase,
})
}
} finally {
internalTaskController.abort() // Notify that the task has completed
entry.taskAbortControllerSet.delete(internalTaskController)
}
}

const middleware: Middleware<
{
(action: Action<'actionListenerMiddleware/add'>): Unsubscribe
Expand Down Expand Up @@ -390,47 +484,7 @@ export function createActionListenerMiddleware<
continue
}

entry.parentJob.launchAndRun(async (jobHandle) => {
const take = createTakePattern(addListener, jobHandle as Job<any>)
const condition: ConditionFunction<S> = (predicate, timeout) => {
return take(predicate, timeout).then(Boolean)
}

const result = await Outcome.try(async () =>
entry.listener(action, {
...api,
getOriginalState,
condition,
take,
currentPhase,
extra,
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
},
job: jobHandle,
cancelPrevious: () => {
entry.parentJob.cancelChildren(
new JobCancellationException(
JobCancellationReason.JobCancelled
),
[jobHandle]
)
},
})
)
if (
result.isError() &&
!(result.error instanceof JobCancellationException)
) {
safelyNotifyError(onError, result.error, {
raisedBy: 'listener',
phase: currentPhase,
})
}

return Outcome.ok(1)
})
notifyListener(entry, action, api, getOriginalState, currentPhase)
}
if (currentPhase === 'beforeReducer') {
result = next(action)
Expand Down
Loading

0 comments on commit 81cf582

Please sign in to comment.