From 604078dfeb18039ef5a709eceeb85db35583ed6c Mon Sep 17 00:00:00 2001 From: Arek Czarnik Date: Fri, 17 Feb 2017 13:50:03 +0100 Subject: [PATCH] first proposal of RX "repeat create operation" --- observable/observable.go | 32 ++++++++ observable/observable_test.go | 138 ++++++++++++++++++++++++++++------ 2 files changed, 149 insertions(+), 21 deletions(-) diff --git a/observable/observable.go b/observable/observable.go index ad91cf9d..31b4eebf 100644 --- a/observable/observable.go +++ b/observable/observable.go @@ -239,6 +239,38 @@ func Interval(term chan struct{}, timeout time.Duration) Observable { return Observable(source) } +// Interval creates an Observable emitting ntimes or infinity the given item. +func Repeat(item interface{}, ntimes ...int) Observable { + source := make(chan interface{}) + // this is the infinity case no ntime parameter is given + if len(ntimes) == 0 { + go func() { + for { + source <- item + } + close(source) + }() + return Observable(source) + } + + // this repeate ntime + if len(ntimes) > 0 { + count := ntimes[0] + if count <= 0 { + return Empty() + } + go func() { + for i := 0; i < count; i++ { + source <- item + } + close(source) + }() + return Observable(source) + } + + return Empty() +} + // Range creates an Observable that emits a particular range of sequential integers. func Range(start, end int) Observable { source := make(chan interface{}) diff --git a/observable/observable_test.go b/observable/observable_test.go index 2defcaf8..64f036f5 100644 --- a/observable/observable_test.go +++ b/observable/observable_test.go @@ -445,7 +445,7 @@ func TestObservableFirst(t *testing.T) { nums = append(nums, num) } }) - + sub := stream2.Subscribe(onNext) <-sub @@ -513,59 +513,59 @@ func TestObservableLastWithEmpty(t *testing.T) { } func TestObservableDistinct(t *testing.T) { - items := []interface{}{1,2,2,1,3} + items := []interface{}{1, 2, 2, 1, 3} it, err := iterable.New(items) if err != nil { t.Fail() } - + stream1 := From(it) - + id := func(item interface{}) interface{} { - return item + return item } - + stream2 := stream1.Distinct(id) - + nums := []int{} onNext := handlers.NextFunc(func(item interface{}) { if num, ok := item.(int); ok { nums = append(nums, num) } }) - + sub := stream2.Subscribe(onNext) - <- sub - - assert.Exactly(t, []int{1,2,3}, nums) + <-sub + + assert.Exactly(t, []int{1, 2, 3}, nums) } func TestObservableDistinctUntilChanged(t *testing.T) { - items := []interface{}{1,2,2,1,3} + items := []interface{}{1, 2, 2, 1, 3} it, err := iterable.New(items) if err != nil { t.Fail() } - + stream1 := From(it) - + id := func(item interface{}) interface{} { - return item + return item } - + stream2 := stream1.DistinctUntilChanged(id) - + nums := []int{} onNext := handlers.NextFunc(func(item interface{}) { if num, ok := item.(int); ok { nums = append(nums, num) } }) - + sub := stream2.Subscribe(onNext) - <- sub - - assert.Exactly(t, []int{1,2,1,3}, nums) + <-sub + + assert.Exactly(t, []int{1, 2, 1, 3}, nums) } func TestObservableScanWithIntegers(t *testing.T) { @@ -647,3 +647,99 @@ func TestObservableScanWithString(t *testing.T) { assert.Exactly(t, expected, words) } + +func TestRepeatInfinityOperator(t *testing.T) { + myStream := Repeat("mystring") + + item, err := myStream.Next() + + if err != nil { + assert.Fail(t, "fail to emit next item", err) + } + + if value, ok := item.(string); ok { + assert.Equal(t, value, "mystring") + } else { + assert.Fail(t, "fail to emit next item", err) + } +} + +func TestRepeatNtimeOperator(t *testing.T) { + myStream := Repeat("mystring", 2) + stringarray := []string{} + + onNext := handlers.NextFunc(func(item interface{}) { + if value, ok := item.(string); ok { + stringarray = append(stringarray, value) + } + }) + + onDone := handlers.DoneFunc(func() { + stringarray = append(stringarray, "end") + }) + + sub := myStream.Subscribe(observer.New(onNext, onDone)) + <-sub + + assert.Exactly(t, []string{"mystring", "mystring", "end"}, stringarray) +} + +func TestRepeatNtimeMultiVariadicOperator(t *testing.T) { + myStream := Repeat("mystring", 2, 2, 3, 4, 5, 6, 7) + stringarray := []string{} + + onNext := handlers.NextFunc(func(item interface{}) { + if value, ok := item.(string); ok { + stringarray = append(stringarray, value) + } + }) + + onDone := handlers.DoneFunc(func() { + stringarray = append(stringarray, "end") + }) + + sub := myStream.Subscribe(observer.New(onNext, onDone)) + <-sub + + assert.Exactly(t, []string{"mystring", "mystring", "end"}, stringarray) +} + +func TestRepeatWithZeroNtimeOperator(t *testing.T) { + myStream := Repeat("mystring", 0) + stringarray := []string{} + + onNext := handlers.NextFunc(func(item interface{}) { + if value, ok := item.(string); ok { + stringarray = append(stringarray, value) + } + }) + + onDone := handlers.DoneFunc(func() { + stringarray = append(stringarray, "end") + }) + + sub := myStream.Subscribe(observer.New(onNext, onDone)) + <-sub + + assert.Exactly(t, []string{"end"}, stringarray) +} + +func TestRepeatWithNegativeTimesOperator(t *testing.T) { + myStream := Repeat("mystring", -10) + stringarray := []string{} + + onNext := handlers.NextFunc(func(item interface{}) { + if value, ok := item.(string); ok { + stringarray = append(stringarray, value) + } + }) + + onDone := handlers.DoneFunc(func() { + stringarray = append(stringarray, "end") + }) + + sub := myStream.Subscribe(observer.New(onNext, onDone)) + <-sub + + assert.Exactly(t, []string{"end"}, stringarray) +}