-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoll.ts
81 lines (74 loc) · 2.49 KB
/
poll.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { EMPTY, MonoTypeOperatorFunction, of, retry, switchMap, tap, throwError, timer } from 'rxjs';
import { normalizeConfig, PollConfig, PollState, RetryKey } from './common/config';
import { getPoller$, visibilityState$ } from './common/observables';
import { Nil } from './common/utils';
/**
* ### RxJS Poll Operator
*
* Polls source using "repeat" or "interval" approach. First emission is sent immediately, \
* then the polling will start. Values will emit until stopped by the user.
*
* Read {@link https://www.npmjs.com/package/rxjs-poll|docs} for more info.
*
* #### Example
*
* ```ts
* import { poll } from 'rxjs-poll';
* import { takeWhile } from 'rxjs';
*
* request$
* .pipe(
* poll(),
* takeWhile(({ isDone }) => !isDone, true)
* )
* .subscribe();
* ```
*
* @param config - {@link PollConfig} object used for polling configuration
*
* @return A function that returns an Observable that will resubscribe to the source on \
* complete or error
*/
export function poll<T>(config?: PollConfig<T> | Nil): MonoTypeOperatorFunction<T> {
return (source$) => {
const { type, retries, isBackgroundMode, isConsecutiveRule, getDelay } = normalizeConfig(config);
const retryKey: RetryKey = isConsecutiveRule ? 'consecutiveRetries' : 'retries';
const state: PollState<T> = {
polls: 0,
retries: 0,
consecutiveRetries: 0,
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
value: null as any,
error: null,
};
const nextPollDelay = (value: T): number => {
state.polls += 1;
state.value = value;
return getDelay(state);
};
const visibility$ = isBackgroundMode ? of(true) : visibilityState$;
const poller$ = getPoller$(type, source$, nextPollDelay);
return visibility$.pipe(
switchMap((isVisible) =>
isVisible
? poller$.pipe(
retry({
delay(error) {
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
state.error = error;
state.retries += 1;
state.consecutiveRetries += 1;
/* eslint-disable @typescript-eslint/no-unsafe-return */
return state[retryKey] > retries ? throwError(() => error) : timer(getDelay(state));
},
}),
tap(() => {
state.error = null;
state.consecutiveRetries = 0;
})
)
: EMPTY
)
);
};
}