forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
69 changed files
with
2,948 additions
and
3,022 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# All Operator | ||
|
||
## Overview | ||
|
||
Determine whether all items emitted by an Observable meet some criteria. | ||
|
||
 | ||
|
||
## Example | ||
|
||
```go | ||
observable := rxgo.Just([]interface{}{1, 2, 3, 4}). | ||
All(func(i interface{}) bool { | ||
// Check all items are less than 10 | ||
return i.(int) < 10 | ||
}) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
true | ||
``` | ||
|
||
## Options | ||
|
||
### WithBufferedChannel | ||
|
||
[Detail](options.md#withbufferedchannel) | ||
|
||
### WithContext | ||
|
||
[Detail](options.md#withcontext) | ||
|
||
### WithObservationStrategy | ||
|
||
[Detail](options.md#withobservationstrategy) | ||
|
||
### WithErrorStrategy | ||
|
||
[Detail](options.md#witherrorstrategy) | ||
|
||
### WithPool | ||
|
||
https://github.com/ReactiveX/RxGo/wiki/Options#withpool | ||
|
||
### WithCPUPool | ||
|
||
https://github.com/ReactiveX/RxGo/wiki/Options#withcpupool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# Amb Operator | ||
|
||
## Overview | ||
|
||
Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item. | ||
|
||
 | ||
|
||
## Example | ||
|
||
```go | ||
observable := rxgo.Amb([]rxgo.Observable{ | ||
rxgo.Just([]interface{}{1, 2, 3}), | ||
rxgo.Just([]interface{}{4, 5, 6}), | ||
}) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
1 | ||
2 | ||
3 | ||
``` | ||
or | ||
``` | ||
4 | ||
5 | ||
6 | ||
``` | ||
|
||
## Options | ||
|
||
### WithBufferedChannel | ||
|
||
[Detail](options.md#withbufferedchannel) | ||
|
||
### WithContext | ||
|
||
[Detail](options.md#withcontext) | ||
|
||
### WithObservationStrategy | ||
|
||
[Detail](options.md#withobservationstrategy) | ||
|
||
### WithErrorStrategy | ||
|
||
[Detail](options.md#witherrorstrategy) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Average Operator | ||
|
||
## Overview | ||
|
||
Calculate the average of numbers emitted by an Observable and emits this average. | ||
|
||
 | ||
|
||
## Instances | ||
|
||
* `AverageFloat32` | ||
* `AverageFloat64` | ||
* `AverageInt` | ||
* `AverageInt8` | ||
* `AverageInt16` | ||
* `AverageInt32` | ||
* `AverageInt64` | ||
|
||
## Example | ||
|
||
```go | ||
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).AverageInt() | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
2 | ||
``` | ||
|
||
## Options | ||
|
||
### WithBufferedChannel | ||
|
||
[Detail](options.md#withbufferedchannel) | ||
|
||
### WithContext | ||
|
||
[Detail](options.md#withcontext) | ||
|
||
### WithObservationStrategy | ||
|
||
[Detail](options.md#withobservationstrategy) | ||
|
||
### WithErrorStrategy | ||
|
||
[Detail](options.md#witherrorstrategy) | ||
|
||
### WithPool | ||
|
||
https://github.com/ReactiveX/RxGo/wiki/Options#withpool | ||
|
||
### WithCPUPool | ||
|
||
https://github.com/ReactiveX/RxGo/wiki/Options#withcpupool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# BackOffRetry Operator | ||
|
||
## Overview | ||
|
||
Implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. | ||
|
||
The backoff configuration relies on [github.com/cenkalti/backoff/v4](github.com/cenkalti/backoff/v4). | ||
|
||
 | ||
|
||
## Example | ||
|
||
```go | ||
// Backoff retry configuration | ||
backOffCfg := backoff.NewExponentialBackOff() | ||
backOffCfg.InitialInterval = 10 * time.Millisecond | ||
|
||
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) { | ||
next <- rxgo.Of(1) | ||
next <- rxgo.Of(2) | ||
next <- rxgo.Error(errors.New("foo")) | ||
done() | ||
}}).BackOffRetry(backoff.WithMaxRetries(backOffCfg, 2)) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
1 | ||
2 | ||
1 | ||
2 | ||
1 | ||
2 | ||
foo | ||
``` | ||
|
||
## Options | ||
|
||
### WithBufferedChannel | ||
|
||
[Detail](options.md#withbufferedchannel) | ||
|
||
### WithContext | ||
|
||
[Detail](options.md#withcontext) | ||
|
||
### WithObservationStrategy | ||
|
||
[Detail](options.md#withobservationstrategy) | ||
|
||
### WithErrorStrategy | ||
|
||
[Detail](options.md#witherrorstrategy) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# Buffer Operator | ||
|
||
## Overview | ||
|
||
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time | ||
|
||
 | ||
|
||
## Instances | ||
|
||
* `BufferWithCount`: | ||
|
||
 | ||
|
||
```go | ||
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).BufferWithCount(3) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
1 2 3 | ||
4 | ||
``` | ||
|
||
* `BufferWithTime`: | ||
|
||
 | ||
|
||
```go | ||
// Create the producer | ||
ch := make(chan rxgo.Item, 1) | ||
go func() { | ||
i := 0 | ||
for range time.Tick(time.Second) { | ||
ch <- rxgo.Of(i) | ||
i++ | ||
} | ||
}() | ||
|
||
observable := rxgo.FromChannel(ch). | ||
BufferWithTime(rxgo.WithDuration(3*time.Second), nil) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
0 1 2 | ||
3 4 5 | ||
6 7 8 | ||
... | ||
``` | ||
|
||
* `BufferWithTimeOrCount`: | ||
|
||
 | ||
|
||
```go | ||
// Create the producer | ||
ch := make(chan rxgo.Item, 1) | ||
go func() { | ||
i := 0 | ||
for range time.Tick(time.Second) { | ||
ch <- rxgo.Of(i) | ||
i++ | ||
} | ||
}() | ||
|
||
observable := rxgo.FromChannel(ch). | ||
BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2) | ||
``` | ||
|
||
Output: | ||
|
||
``` | ||
0 1 | ||
2 3 | ||
4 5 | ||
... | ||
``` | ||
|
||
## Options | ||
|
||
### WithBufferedChannel | ||
|
||
[Detail](options.md#withbufferedchannel) | ||
|
||
### WithContext | ||
|
||
[Detail](options.md#withcontext) | ||
|
||
### WithObservationStrategy | ||
|
||
[Detail](options.md#withobservationstrategy) | ||
|
||
### WithErrorStrategy | ||
|
||
[Detail](options.md#witherrorstrategy) |
Oops, something went wrong.