Skip to content

Commit

Permalink
first proposal of RX "repeat create operation"
Browse files Browse the repository at this point in the history
  • Loading branch information
Arek Czarnik committed Feb 21, 2017
1 parent 716e8b1 commit 604078d
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 21 deletions.
32 changes: 32 additions & 0 deletions observable/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,38 @@ func Interval(term chan struct{}, timeout time.Duration) Observable {
return Observable(source)
}

// Interval creates an Observable emitting ntimes or infinity the given item.
func Repeat(item interface{}, ntimes ...int) Observable {
source := make(chan interface{})
// this is the infinity case no ntime parameter is given
if len(ntimes) == 0 {
go func() {
for {
source <- item
}
close(source)
}()
return Observable(source)
}

// this repeate ntime
if len(ntimes) > 0 {
count := ntimes[0]
if count <= 0 {
return Empty()
}
go func() {
for i := 0; i < count; i++ {
source <- item
}
close(source)
}()
return Observable(source)
}

return Empty()
}

// Range creates an Observable that emits a particular range of sequential integers.
func Range(start, end int) Observable {
source := make(chan interface{})
Expand Down
138 changes: 117 additions & 21 deletions observable/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func TestObservableFirst(t *testing.T) {
nums = append(nums, num)
}
})

sub := stream2.Subscribe(onNext)
<-sub

Expand Down Expand Up @@ -513,59 +513,59 @@ func TestObservableLastWithEmpty(t *testing.T) {
}

func TestObservableDistinct(t *testing.T) {
items := []interface{}{1,2,2,1,3}
items := []interface{}{1, 2, 2, 1, 3}
it, err := iterable.New(items)
if err != nil {
t.Fail()
}

stream1 := From(it)

id := func(item interface{}) interface{} {
return item
return item
}

stream2 := stream1.Distinct(id)

nums := []int{}
onNext := handlers.NextFunc(func(item interface{}) {
if num, ok := item.(int); ok {
nums = append(nums, num)
}
})

sub := stream2.Subscribe(onNext)
<- sub
assert.Exactly(t, []int{1,2,3}, nums)
<-sub

assert.Exactly(t, []int{1, 2, 3}, nums)
}

func TestObservableDistinctUntilChanged(t *testing.T) {
items := []interface{}{1,2,2,1,3}
items := []interface{}{1, 2, 2, 1, 3}
it, err := iterable.New(items)
if err != nil {
t.Fail()
}

stream1 := From(it)

id := func(item interface{}) interface{} {
return item
return item
}

stream2 := stream1.DistinctUntilChanged(id)

nums := []int{}
onNext := handlers.NextFunc(func(item interface{}) {
if num, ok := item.(int); ok {
nums = append(nums, num)
}
})

sub := stream2.Subscribe(onNext)
<- sub
assert.Exactly(t, []int{1,2,1,3}, nums)
<-sub

assert.Exactly(t, []int{1, 2, 1, 3}, nums)
}

func TestObservableScanWithIntegers(t *testing.T) {
Expand Down Expand Up @@ -647,3 +647,99 @@ func TestObservableScanWithString(t *testing.T) {

assert.Exactly(t, expected, words)
}

func TestRepeatInfinityOperator(t *testing.T) {
myStream := Repeat("mystring")

item, err := myStream.Next()

if err != nil {
assert.Fail(t, "fail to emit next item", err)
}

if value, ok := item.(string); ok {
assert.Equal(t, value, "mystring")
} else {
assert.Fail(t, "fail to emit next item", err)
}
}

func TestRepeatNtimeOperator(t *testing.T) {
myStream := Repeat("mystring", 2)
stringarray := []string{}

onNext := handlers.NextFunc(func(item interface{}) {
if value, ok := item.(string); ok {
stringarray = append(stringarray, value)
}
})

onDone := handlers.DoneFunc(func() {
stringarray = append(stringarray, "end")
})

sub := myStream.Subscribe(observer.New(onNext, onDone))
<-sub

assert.Exactly(t, []string{"mystring", "mystring", "end"}, stringarray)
}

func TestRepeatNtimeMultiVariadicOperator(t *testing.T) {
myStream := Repeat("mystring", 2, 2, 3, 4, 5, 6, 7)
stringarray := []string{}

onNext := handlers.NextFunc(func(item interface{}) {
if value, ok := item.(string); ok {
stringarray = append(stringarray, value)
}
})

onDone := handlers.DoneFunc(func() {
stringarray = append(stringarray, "end")
})

sub := myStream.Subscribe(observer.New(onNext, onDone))
<-sub

assert.Exactly(t, []string{"mystring", "mystring", "end"}, stringarray)
}

func TestRepeatWithZeroNtimeOperator(t *testing.T) {
myStream := Repeat("mystring", 0)
stringarray := []string{}

onNext := handlers.NextFunc(func(item interface{}) {
if value, ok := item.(string); ok {
stringarray = append(stringarray, value)
}
})

onDone := handlers.DoneFunc(func() {
stringarray = append(stringarray, "end")
})

sub := myStream.Subscribe(observer.New(onNext, onDone))
<-sub

assert.Exactly(t, []string{"end"}, stringarray)
}

func TestRepeatWithNegativeTimesOperator(t *testing.T) {
myStream := Repeat("mystring", -10)
stringarray := []string{}

onNext := handlers.NextFunc(func(item interface{}) {
if value, ok := item.(string); ok {
stringarray = append(stringarray, value)
}
})

onDone := handlers.DoneFunc(func() {
stringarray = append(stringarray, "end")
})

sub := myStream.Subscribe(observer.New(onNext, onDone))
<-sub

assert.Exactly(t, []string{"end"}, stringarray)
}

0 comments on commit 604078d

Please sign in to comment.