Skip to content

Commit

Permalink
Merge pull request reduxjs#1830 from reduxjs/feature/listener-size-op…
Browse files Browse the repository at this point in the history
…timization
  • Loading branch information
markerikson authored Dec 15, 2021
2 parents c4e18b3 + ce658ef commit 61a7900
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 131 deletions.
143 changes: 67 additions & 76 deletions packages/action-listener-middleware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ const listenerMiddleware = createActionListenerMiddleware()

// Add one or more listener callbacks for specific actions. They may
// contain any sync or async logic, similar to thunks.
listenerMiddleware.addListener(todoAdded, (action, listenerApi) => {
listenerMiddleware.addListener(todoAdded, async (action, listenerApi) => {
// Run whatever additional side-effect-y logic you want here
const { text } = action.payload
console.log('Todo added: ', text)
console.log('Todo added: ', action.payload.text)

if (text === 'Buy milk') {
// Use the listener API methods to dispatch, get state, or unsubscribe the listener
// Can cancel previous running instances
listenerApi.cancelPrevious()

// Run async logic
const data = await fetchData()

// Pause until action dispatched or state changed
if (await listenerApi.condition(matchSomeAction)) {
// Use the listener API methods to dispatch, get state,
// unsubscribe the listener, or cancel previous
listenerApi.dispatch(todoAdded('Buy pet food'))
listenerApi.unsubscribe()
}
Expand Down Expand Up @@ -228,58 +235,68 @@ The `listenerApi` object is the second argument to each listener callback. It co

- `unsubscribe: () => void`: will remove the listener from the middleware
- `subscribe: () => void`: will re-subscribe the listener if it was previously removed, or no-op if currently subscribed
- `cancelPrevious: () => void`: cancels any previously running instances of this same listener. (The cancelation will only have a meaningful effect if the previous instances are paused using one of the `job` APIs, `take`, or `condition` - see "Cancelation and Job Management" in the "Usage" section for more details)
- `cancelPrevious: () => void`: cancels any previously running instances of this same listener. (The cancelation will only have a meaningful effect if the previous instances are paused using one of the cancelation-aware APIs like `take/cancel/pause/delay` - see "Cancelation and Task Management" in the "Usage" section for more details)
- `signal: AbortSignal`: An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) whose `aborted` property will be set to `true` if the listener execution is aborted or completed.

Dynamically unsubscribing and re-subscribing this listener allows for more complex async workflows, such as avoiding duplicate running instances by calling `listenerApi.unsubscribe()` at the start of a listener, or calling `listenerApi.cancelPrevious()` to ensure that only the most recent instance is allowed to complete.

#### Conditional Workflow Execution

- `take: (predicate: ListenerPredicate, timeout?: number) => Promise<[Action, State, State] | null>`: returns a promise that will resolve when the `predicate` returns `true`. The return value is the `[action, currentState, previousState]` combination that the predicate saw as arguments. If a `timeout` is provided and expires if a `timeout` is provided and expires first. the promise resolves to `null`.
- `condition: (predicate: ListenerPredicate, timeout?: number) => Promise<boolean>`: Similar to `take`, but resolves to `true` if the predicate succeeds, and `false` if a `timeout` is provided and expires first. This allows async logic to pause and wait for some condition to occur before continuing. See "Writing Async Workflows" below for details on usage.
- `delay: (timeoutMs: number) => Promise<void>`: returns a cancelation-aware promise that resolves after the timeout, or rejects if canceled before the expiration
- `pause: (promise: Promise<T>) => Promise<T>`: accepts any promise, and returns a cancelation-aware promise that either resolves with the argument promise or rejects if canceled before the resolution

These methods provide the ability to write conditional logic based on future dispatched actions and state changes. Both also accept an optional `timeout` in milliseconds.

`take` resolves to a `[action, currentState, previousState]` tuple or `null` if it timed out, whereas `condition` resolves to `true` if it succeeded or `false` if timed out.

`take` is meant for "wait for an action and get its contents", while `condition` is meant for checks like `if (await condition(predicate))`.

Both these methods are cancelation-aware, and will throw a `JobCancelationException` if the listener instance is canceled while paused.
Both these methods are cancelation-aware, and will throw a `TaskAbortError` if the listener instance is canceled while paused.

#### Child Tasks

#### Job API
- `fork: (executor: (forkApi: ForkApi) => T | Promise<T>) => ForkedTask<T>`: Launches a "child task" that may be used to accomplish additional work. Accepts any sync or async function as its argument, and returns a `{result, cancel}` object that can be used to check the final status and return value of the child task, or cancel it while in-progress.

- `job: JobHandle`: a group of functions that allow manipulating the current running listener instance, including cancelation-aware delays, and launching "child Jobs" that can be used to run additional nested logic.
Child tasks can be launched, and waited on to collect their return values. The provided `executor` function will be called with a `forkApi` object containing `{pause, delay, signal}`, allowing it to pause or check cancelation status. It can also make use of the `listenerApi` from the listener's scope.

The job implementation is based on https://github.com/ethossoftworks/job-ts . The `JobHandle` type includes:
An example of this might be a listener that forks a child task containing an infinite loop that listens for events from a server. The parent then uses `listenerApi.condition()` to wait for a "stop" action, and cancels the child task.

The task and result types are:

```ts
interface JobHandle {
isActive: boolean
isCompleted: boolean
isCancelled: boolean
childCount: number
ensureActive(): void
launch<R>(func: JobFunc<R>): Job<R>
launchAndRun<R>(func: JobFunc<R>): Promise<Outcome<R>>
pause<R>(func: Promise<R>): Promise<R>
delay(milliseconds: number): Promise<void>
cancel(reason?: JobCancellationException): void
cancelChildren(
reason?: JobCancellationException,
skipChildren?: JobHandle[]
): void
export interface ForkedTaskAPI {
pause<W>(waitFor: Promise<W>): Promise<W>
delay(timeoutMs: number): Promise<void>
signal: AbortSignal
}
```

`pause` and `delay` are both cancelation-aware. If the current listener is canceled, they will throw a `JobCancelationException` if the listener instance is canceled while paused.
export type TaskResolved<T> = {
readonly status: 'ok'
readonly value: T
}

Child jobs can be launched, and waited on to collect their return values.
export type TaskRejected = {
readonly status: 'rejected'
readonly error: unknown
}

Note that the jobs API relies on a functional-style async result abstraction called an `Outcome`, which wraps promise results.
export type TaskCancelled = {
readonly status: 'cancelled'
readonly error: TaskAbortError
}

This API will be documented more as the middleware implementation is finalized. For now, you can see the existing third-party library docs here:
export type TaskResult<Value> =
| TaskResolved<Value>
| TaskRejected
| TaskCancelled

- https://github.com/ethossoftworks/job-ts/blob/main/docs/api.md
- https://github.com/ethossoftworks/outcome-ts#usage
export interface ForkedTask<T> {
result: Promise<TaskResult<T>>
cancel(): void
}
```

## Usage Guide

Expand All @@ -289,7 +306,7 @@ This middleware lets you run additional logic when some action is dispatched, as

This middleware is not intended to handle all possible use cases. Like thunks, it provides you with a basic set of primitives (including access to `dispatch` and `getState`), and gives you freedom to write any sync or async logic you want. This is both a strength (you can do anything!) and a weakness (you can do anything, with no guard rails!).

As of v0.4.0, the middleware does include several async workflow primitives that are sufficient to write equivalents to many Redux-Saga effects operators like `takeLatest`, `takeLeading`, and `debounce`.
As of v0.5.0, the middleware does include several async workflow primitives that are sufficient to write equivalents to many Redux-Saga effects operators like `takeLatest`, `takeLeading`, and `debounce`.

### Standard Usage Patterns

Expand Down Expand Up @@ -401,54 +418,29 @@ test('condition method resolves promise when there is a timeout', async () => {
})
```
### Cancelation and Job Management
As of 0.4.0, the middleware now uses a `Job` abstraction to help manage cancelation of existing listener instances. The `Job` implementation is based on https://github.com/ethossoftworks/job-ts .
Each running listener instance is wrapped in a `Job` that provides cancelation awareness. A running `Job` instance has a `JobHandle` object that can be used to control it:
```ts
interface JobHandle {
isActive: boolean
isCompleted: boolean
isCancelled: boolean
childCount: number
ensureActive(): void
launch<R>(func: JobFunc<R>): Job<R>
launchAndRun<R>(func: JobFunc<R>): Promise<Outcome<R>>
pause<R>(func: Promise<R>): Promise<R>
delay(milliseconds: number): Promise<void>
cancel(reason?: JobCancellationException): void
cancelChildren(
reason?: JobCancellationException,
skipChildren?: JobHandle[]
): void
}
```

`listenerApi.job` exposes that `JobHandle` for the current listener instance so it can be accessed by the listener logic.
### Cancelation and Task Management
Full documentation of `JobHandle` can currently be viewed at https://github.com/ethossoftworks/job-ts/blob/main/docs/api.md . Note that this API also uses a custom functional-style wrapper around async results called an `Outcome`: https://github.com/ethossoftworks/outcome-ts .
As of 0.5.0, the middleware now supports cancelation of running listener instances, `take/condition`/pause/delay` functions, and "child tasks", with an implementation based on [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController).

The `listenerApi.job.pause/delay()` functions provide a cancelation-aware way to have the current listener sleep. `pause()` accepts a promise, while `delay` accepts a timeout value. If the listener is canceled while waiting, a `JobCancelationException` will be thrown.
The `listenerApi.pause/delay()` functions provide a cancelation-aware way to have the current listener sleep. `pause()` accepts a promise, while `delay` accepts a timeout value. If the listener is canceled while waiting, a `TaskAbortError` will be thrown. In addition, both `take` and `condition` support cancelation interruption as well.

This can also be used to launch "child jobs" that can do additional work. These can be waited on to collect their results. An example of this might look like:
`listenerApi.fork()` can used to launch "child tasks" that can do additional work. These can be waited on to collect their results. An example of this might look like:

```ts
middleware.addListener({
actionCreator: increment,
listener: async (action, listenerApi) => {
// Spawn a child job and start it immediately
const childJobPromise = listenerApi.job.launchAndRun(async (jobHandle) => {
// Spawn a child task and start it immediately
const task = listenerApi.fork(async (forkApi) => {
// Artificially wait a bit inside the child
await jobHandle.delay(5)
// Complete the child by returning an Outcome-wrapped value
return Outcome.ok(42)
await forkApi.delay(5)
// Complete the child by returning an Ovalue
return 42
})
const result = await childJobPromise
const result = await task.result
// Unwrap the child result in the listener
if (result.isOk()) {
if (result.status === 'ok') {
console.log('Child succeeded: ', result.value)
}
},
Expand All @@ -457,7 +449,7 @@ middleware.addListener({

### Complex Async Workflows

The provided async workflow primitives (`cancelPrevious`, `unsuscribe`, `subscribe`, `take`, `condition`, `job.pause`, `job.delay`) can be used to implement many of the more complex async workflow capabilities found in the Redux-Saga library. This includes effects such as `throttle`, `debounce`, `takeLatest`, `takeLeading`, and `fork/join`. Some examples:
The provided async workflow primitives (`cancelPrevious`, `unsuscribe`, `subscribe`, `take`, `condition`, `pause`, `delay`) can be used to implement many of the more complex async workflow capabilities found in the Redux-Saga library. This includes effects such as `throttle`, `debounce`, `takeLatest`, `takeLeading`, and `fork/join`. Some examples:

```js
test('debounce / takeLatest', async () => {
Expand All @@ -474,7 +466,7 @@ test('debounce / takeLatest', async () => {
listenerApi.cancelPrevious()
// Delay before starting actual work
await listenerApi.job.delay(15)
await listenerApi.delay(15)
// do work here
},
Expand Down Expand Up @@ -515,18 +507,17 @@ test('canceled', async () => {
if (increment.match(action)) {
// Have this branch wait around to be canceled by the other
try {
await listenerApi.job.delay(10)
await listenerApi.delay(10)
} catch (err) {
// Can check cancelation based on the exception and its reason
if (err instanceof JobCancellationException) {
if (err instanceof TaskAbortError) {
canceledAndCaught = true
}
}
} else if (incrementByAmount.match(action)) {
// do a non-cancelation-aware wait
await sleep(15)
// Or can check based on `job.isCancelled`
if (listenerApi.job.isCancelled) {
await delay(15)
if (listenerApi.signal.aborted) {
canceledCheck = true
}
} else if (decrement.match(action)) {
Expand Down
9 changes: 4 additions & 5 deletions packages/action-listener-middleware/src/exceptions.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
export class TaskAbortError implements Error {
name: string
message: string
constructor(public reason?: string) {
this.name = 'TaskAbortError'
this.message = `task cancelled` + (reason != null ? `: "${reason}"` : '')
name = 'TaskAbortError'
message = ''
constructor(public reason = 'unknown') {
this.message = `task cancelled (reason: ${reason})`
}
}
91 changes: 49 additions & 42 deletions packages/action-listener-middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ export type {
TaskResult,
} from './types'

const defaultWhen: MiddlewarePhase = 'afterReducer'
const actualMiddlewarePhases = ['beforeReducer', 'afterReducer'] as const
//Overly-aggressive byte-shaving
const { assign } = Object

const beforeReducer = 'beforeReducer' as const
const afterReducer = 'afterReducer' as const

const defaultWhen: MiddlewarePhase = afterReducer
const actualMiddlewarePhases = [beforeReducer, afterReducer] as const

const createFork = (parentAbortSignal: AbortSignal) => {
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
Expand Down Expand Up @@ -162,19 +168,17 @@ const createTakePattern = <S>(
export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
options: FallbackAddListenerOptions
) => {
let predicate: ListenerPredicate<any, any>
let type: string | undefined

if ('type' in options) {
type = options.type
predicate = (action: any): action is any => action.type === type
} else if ('actionCreator' in options) {
type = options.actionCreator!.type
predicate = options.actionCreator.match
} else if ('matcher' in options) {
predicate = options.matcher
} else if ('predicate' in options) {
predicate = options.predicate
let { type, actionCreator, matcher, predicate, listener } = options

if (type) {
predicate = createAction(type).match
} else if (actionCreator) {
type = actionCreator!.type
predicate = actionCreator.match
} else if (matcher) {
predicate = matcher
} else if (predicate) {
// pass
} else {
throw new Error(
'Creating a listener requires one of the known fields for matching against actions'
Expand All @@ -185,7 +189,7 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
const entry: ListenerEntry<unknown> = {
when: options.when || defaultWhen,
id,
listener: options.listener,
listener,
type,
predicate,
pendingSet: new Set<AbortController>(),
Expand Down Expand Up @@ -371,30 +375,33 @@ export function createActionListenerMiddleware<
try {
entry.pendingSet.add(internalTaskController)
await Promise.resolve(
entry.listener(action, {
...api,
getOriginalState,
condition,
take,
delay,
pause,
currentPhase,
extra,
signal: internalTaskController.signal,
fork,
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
},
cancelPrevious: () => {
entry.pendingSet.forEach((controller, _, set) => {
if (controller !== internalTaskController) {
controller.abort()
set.delete(controller)
}
})
},
})
entry.listener(
action,
// Use assign() rather than ... to avoid extra helper functions added to bundle
assign({}, api, {
getOriginalState,
condition,
take,
delay,
pause,
currentPhase,
extra,
signal: internalTaskController.signal,
fork,
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
},
cancelPrevious: () => {
entry.pendingSet.forEach((controller, _, set) => {
if (controller !== internalTaskController) {
controller.abort()
set.delete(controller)
}
})
},
})
)
)
} catch (listenerError) {
if (!(listenerError instanceof TaskAbortError)) {
Expand Down Expand Up @@ -467,15 +474,15 @@ export function createActionListenerMiddleware<

notifyListener(entry, action, api, getOriginalState, currentPhase)
}
if (currentPhase === 'beforeReducer') {
if (currentPhase === beforeReducer) {
result = next(action)
} else {
return result
}
}
}

return Object.assign(
return assign(
middleware,
{
addListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ describe('Saga-style Effects Scenarios', () => {
} else if (incrementByAmount.match(action)) {
// do a non-cancelation-aware wait
await delay(15)
// Or can check based on `job.isCancelled`
if (listenerApi.signal.aborted) {
canceledCheck = true
}
Expand Down
Loading

0 comments on commit 61a7900

Please sign in to comment.