Skip to content

Commit

Permalink
Syntactic sugar just
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 25, 2020
1 parent 54fcd1d commit 9bb30d1
Show file tree
Hide file tree
Showing 60 changed files with 108 additions and 115 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ go get -u github.com/reactivex/rxgo/v2
Let's create our first Observable and consume an item:

```go
observable := rxgo.Just([]rxgo.Item{rxgo.Of("Hello, World!")})
observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)
```

The `Just` operator creates an Observable from a static list of items. `Of(value)` creates an item from a given value. If we want to create an item from an error, we have to use `Error(err)`. This is a difference with the v1 that was accepting directly a value or an error without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.

By the way, the `Just` operator uses currying as a syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.

Once the Observable is created, we can observe it using `Observe()`. By default, an Observable is lazy in the sense that it emits items only once a subscription is made. `Observe()` returns a `<-chan rxgo.Item`.

We consumed an item from this channel and printed its value of the item using `item.V`.
Expand Down
2 changes: 1 addition & 1 deletion doc/all.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Determine whether all items emitted by an Observable meet some criteria.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).
observable := rxgo.Just(1, 2, 3, 4)().
All(func(i interface{}) bool {
// Check all items are less than 10
return i.(int) < 10
Expand Down
4 changes: 2 additions & 2 deletions doc/amb.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ Given two or more source Observables, emit all of the items from only the first

```go
observable := rxgo.Amb([]rxgo.Observable{
rxgo.Just([]interface{}{1, 2, 3}),
rxgo.Just([]interface{}{4, 5, 6}),
rxgo.Just(1, 2, 3)(),
rxgo.Just(4, 5, 6)(),
})
```

Expand Down
2 changes: 1 addition & 1 deletion doc/assert.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ There is a public API to facilitate writing unit tests while using RxGo. This is
```go
func TestMap(t *testing.T) {
err := errors.New("foo")
observable := rxgo.Just([]interface{}{1, 2, 3}).
observable := rxgo.Just(1, 2, 3)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
if i == 3 {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion doc/average.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Calculate the average of numbers emitted by an Observable and emits this average
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).AverageInt()
observable := rxgo.Just(1, 2, 3, 4)().AverageInt()
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/buffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Periodically gather items emitted by an Observable into bundles and emit these b
![](http://reactivex.io/documentation/operators/images/bufferWithCount3.png)

```go
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).BufferWithCount(3)
observable := rxgo.Just(1, 2, 3, 4)().BufferWithCount(3)
```

Output:
Expand Down
8 changes: 4 additions & 4 deletions doc/catch.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ Recover from an error by continuing the sequence without error.
### OnErrorResumeNext

```go
observable := rxgo.Just([]interface{}{1, 2, errors.New("foo")}).
observable := rxgo.Just(1, 2, errors.New("foo"))().
OnErrorResumeNext(func(e error) rxgo.Observable {
return rxgo.Just([]interface{}{3, 4})
return rxgo.Just(3, 4)()
})
```

Expand All @@ -33,7 +33,7 @@ Output:
### OnErrorReturn

```go
observable := rxgo.Just([]interface{}{1, errors.New("2"), 3, errors.New("4"), 5}).
observable := rxgo.Just(1, errors.New("2"), 3, errors.New("4"), 5)().
OnErrorReturn(func(err error) interface{} {
return err.Error()
})
Expand All @@ -52,7 +52,7 @@ Output:
### OnErrorReturnItem

```go
observable := rxgo.Just([]interface{}{1, errors.New("2"), 3, errors.New("4"), 5}).
observable := rxgo.Just(1, errors.New("2"), 3, errors.New("4"), 5)().
OnErrorReturnItem("foo")
```

Expand Down
4 changes: 2 additions & 2 deletions doc/combinelatest.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ observable := rxgo.CombineLatest(func(i ...interface{}) interface{} {
}
return sum
}, []rxgo.Observable{
rxgo.Just([]interface{}{1, 2}),
rxgo.Just([]interface{}{10, 11}),
rxgo.Just(1, 2)(),
rxgo.Just(10, 11)(),
})
```

Expand Down
4 changes: 2 additions & 2 deletions doc/concat.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ Emit the emissions from two or more Observables without interleaving them.

```go
observable := rxgo.Concat([]rxgo.Observable{
rxgo.Just([]interface{}{1, 2, 3}),
rxgo.Just([]interface{}{4, 5, 6}),
rxgo.Just(1, 2, 3)(),
rxgo.Just(4, 5, 6)(),
})
```

Expand Down
2 changes: 1 addition & 1 deletion doc/contains.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Determine whether an Observable emits a particular item or not.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).Contains(func(i interface{}) bool {
observable := rxgo.Just(1, 2, 3)().Contains(func(i interface{}) bool {
return i == 2
})
```
Expand Down
2 changes: 1 addition & 1 deletion doc/count.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Count the number of items emitted by the source Observable and emit only this va
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).Count()
observable := rxgo.Just(1, 2, 3)().Count()
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/distinct.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Suppress duplicate items emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 2, 3, 4, 4, 5}).
observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
})
Expand Down
2 changes: 1 addition & 1 deletion doc/distinctuntilchanged.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Suppress consecutive duplicate items in the original Observable.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 2, 1, 1, 3}).
observable := rxgo.Just(1, 2, 2, 1, 1, 3)().
DistinctUntilChanged(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
})
Expand Down
6 changes: 3 additions & 3 deletions doc/do.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Each one returns a `<-chan struct{}` that closes once the Observable terminates.
### DoOnNext

```go
<-rxgo.Just([]interface{}{1, 2, 3}).
<-rxgo.Just(1, 2, 3)().
DoOnNext(func(i interface{}) {
fmt.Println(i)
})
Expand All @@ -36,7 +36,7 @@ Output:
### DoOnError

```go
<-rxgo.Just([]interface{}{1, 2, errors.New("foo")}).
<-rxgo.Just(1, 2, errors.New("foo"))().
DoOnError(func(err error) {
fmt.Println(err)
})
Expand All @@ -51,7 +51,7 @@ foo
### DoOnCompleted

```go
<-rxgo.Just([]interface{}{1, 2, 3}).
<-rxgo.Just(1, 2, 3)().
DoOnCompleted(func() {
fmt.Println("done")
})
Expand Down
2 changes: 1 addition & 1 deletion doc/elementat.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Emit only item n emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{0, 1, 2, 3, 4}).ElementAt(2)
observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/error.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This method is blocking.
## Example

```go
err := rxgo.Just([]interface{}{1, 2, errors.New("foo")}).Error()
err := rxgo.Just(1, 2, errors.New("foo"))().Error()
fmt.Println(err)
```

Expand Down
4 changes: 2 additions & 2 deletions doc/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ This method is blocking.
## Example

```go
errs := rxgo.Just([]interface{}{
errs := rxgo.Just(
errors.New("foo"),
errors.New("bar"),
errors.New("baz"),
}).Errors(rxgo.WithErrorStrategy(rxgo.Continue))
)().Errors(rxgo.WithErrorStrategy(rxgo.Continue))
fmt.Println(errs)
```

Expand Down
2 changes: 1 addition & 1 deletion doc/filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Emit only those items from an Observable that pass a predicate test.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).
observable := rxgo.Just(1, 2, 3)().
Filter(func(i interface{}) bool {
return i != 2
})
Expand Down
2 changes: 1 addition & 1 deletion doc/first.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Emit only the first item emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).First()
observable := rxgo.Just(1, 2, 3)().First()
```

Output:
Expand Down
7 changes: 2 additions & 5 deletions doc/flatmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ Transform the items emitted by an Observable into Observables, then flatten the
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).FlatMap(func(i rxgo.Item) rxgo.Observable {
return rxgo.Just([]interface{}{
i.V.(int) * 10,
i.V.(int) * 100,
})
observable := rxgo.Just(1, 2, 3)().FlatMap(func(i rxgo.Item) rxgo.Observable {
return rxgo.Just(i.V.(int) * 10, i.V.(int) * 100)()
})
```

Expand Down
2 changes: 1 addition & 1 deletion doc/foreach.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ It returns a `<-chan struct{}` that closes once the Observable terminates.
## Example

```go
<-rxgo.Just([]interface{}{1, errors.New("foo")}).
<-rxgo.Just(1, errors.New("foo"))().
ForEach(
func(i interface{}) {
fmt.Printf("next: %v\n", i)
Expand Down
2 changes: 1 addition & 1 deletion doc/ignoreelements.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Do not emit any items from an Observable but mirror its termination notification
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, errors.New("foo")}).
observable := rxgo.Just(1, 2, errors.New("foo"))().
IgnoreElements()
```

Expand Down
6 changes: 3 additions & 3 deletions doc/just.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Convert an object or a set of objects into an Observable that emits that or thos
### Single Item

```go
observable := rxgo.Just(1)
observable := rxgo.Just(1)()
```

Output:
Expand All @@ -23,7 +23,7 @@ Output:
### Multiple Items

```go
observable := rxgo.Just([]interface{}{1, 2, 3})
observable := rxgo.Just(1, 2, 3)()
```

Output:
Expand All @@ -38,7 +38,7 @@ Output:

```go
externalCh := make(chan int)
observable := rxgo.Just(externalCh)
observable := rxgo.Just(externalCh)()
```

## Options
Expand Down
2 changes: 1 addition & 1 deletion doc/last.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Emit only the last item emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).Last()
observable := rxgo.Just(1, 2, 3)().Last()
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Transform the items emitted by an Observable by applying a function to each item
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).
observable := rxgo.Just(1, 2, 3)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 10, nil
})
Expand Down
8 changes: 4 additions & 4 deletions doc/marshal.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ type customer struct {
ID int `json:"id"`
}

observable := rxgo.Just([]customer{
{
observable := rxgo.Just(
customer{
ID: 1,
},
{
customer{
ID: 2,
},
}).Marshal(json.Marshal)
)().Marshal(json.Marshal)
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/max.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Determine, and emit, the maximum-valued item emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{2, 5, 1, 6, 3, 4}).
observable := rxgo.Just(2, 5, 1, 6, 3, 4)().
Max(func(i1 interface{}, i2 interface{}) int {
return i1.(int) - i2.(int)
})
Expand Down
4 changes: 2 additions & 2 deletions doc/merge.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ Combine multiple Observables into one by merging their emissions.

```go
observable := rxgo.Merge([]rxgo.Observable{
rxgo.Just([]interface{}{1, 2}),
rxgo.Just([]interface{}{3, 4}),
rxgo.Just(1, 2)(),
rxgo.Just(3, 4)(),
})
```

Expand Down
2 changes: 1 addition & 1 deletion doc/min.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Determine, and emit, the minimum-valued item emitted by an Observable.
## Example

```go
observable := rxgo.Just([]interface{}{2, 5, 1, 6, 3, 4}).
observable := rxgo.Just(2, 5, 1, 6, 3, 4)().
Max(func(i1 interface{}, i2 interface{}) int {
return i1.(int) - i2.(int)
})
Expand Down
2 changes: 1 addition & 1 deletion doc/reduce.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Apply a function to each item emitted by an Observable, sequentially, and emit t
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).
observable := rxgo.Just(1, 2, 3)().
Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
if acc == nil {
return elem, nil
Expand Down
2 changes: 1 addition & 1 deletion doc/repeat.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Create an Observable that emits a particular item multiple times at a particular
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3}).
observable := rxgo.Just(1, 2, 3)().
Repeat(3, rxgo.WithDuration(time.Second))
```

Expand Down
2 changes: 1 addition & 1 deletion doc/retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Implements a retry if a source Observable sends an error, resubscribe to it in t
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, errors.New("foo")}).Retry(2)
observable := rxgo.Just(1, 2, errors.New("foo"))().Retry(2)
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion doc/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ It returns a `<-chan struct{}` that closes once the Observable terminates.
## Example

```go
<-rxgo.Just([]interface{}{1, 2, errors.New("foo")}).Run()
<-rxgo.Just(1, 2, errors.New("foo"))().Run()
```

## Options
Expand Down
2 changes: 1 addition & 1 deletion doc/scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Apply a function to each item emitted by an Observable, sequentially, and emit e
## Example

```go
observable := rxgo.Just([]interface{}{1, 2, 3, 4, 5}).
observable := rxgo.Just(1, 2, 3, 4, 5)().
Scan(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
if acc == nil {
return elem, nil
Expand Down
Loading

0 comments on commit 9bb30d1

Please sign in to comment.