Skip to content

Commit

Permalink
Support First
Browse files Browse the repository at this point in the history
  • Loading branch information
pocket7878 committed Feb 7, 2017
1 parent e1b20f6 commit ff138ac
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
13 changes: 13 additions & 0 deletions observable/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ func (o Observable) Filter(apply fx.FilterableFunc) Observable {
return Observable(out)
}

// First returns new Observable which emit only first item.
func (o Observable) First() Observable {
out := make(chan interface{})
go func() {
for item := range o {
out <- item
break
}
close(out)
}()
return Observable(out)
}

//Distinct supress duplicate items in the original Observable and returns
// a new Observable.
func (o Observable) Distinct(apply fx.KeySelectorFunc) Observable {
Expand Down
42 changes: 42 additions & 0 deletions observable/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,48 @@ func TestObservableFilter(t *testing.T) {
assert.Exactly(t, []int{1, 2, 3, 7}, nums)
}

func TestObservableFirst(t *testing.T) {
items := []interface{}{0, 1, 3}
it, err := iterable.New(items)
if err != nil {
t.Fail()
}

stream1 := From(it)

stream2 := stream1.First()

nums := []int{}
onNext := handlers.NextFunc(func(item interface{}) {
if num, ok := item.(int); ok {
nums = append(nums, num)
}
})

sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{0}, nums)
}

func TestObservableFirstWithEmpty(t *testing.T) {
stream1 := Empty()

stream2 := stream1.First()

nums := []int{}
onNext := handlers.NextFunc(func(item interface{}) {
if num, ok := item.(int); ok {
nums = append(nums, num)
}
})

sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{}, nums)
}

func TestObservableDistinct(t *testing.T) {
items := []interface{}{1,2,2,1,3}
it, err := iterable.New(items)
Expand Down

0 comments on commit ff138ac

Please sign in to comment.