Skip to content

Commit

Permalink
Remove single observer
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Jul 30, 2019
1 parent c6e0b98 commit ade4bd6
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 267 deletions.
16 changes: 14 additions & 2 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,13 @@ mainLoop:
func AssertSingle(t *testing.T, single Single, assertions ...RxAssertion) {
ass := parseAssertions(assertions...)

v, err := single.Subscribe(nil).Block()
var v interface{}
var err error
single.Subscribe(NewObserver(NextFunc(func(i interface{}) {
v = i
}), ErrFunc(func(e error) {
err = e
}))).Block()

checkHasValue, value := ass.hasValueFunc()
if checkHasValue {
Expand Down Expand Up @@ -270,7 +276,13 @@ func AssertSingle(t *testing.T, single Single, assertions ...RxAssertion) {
func AssertOptionalSingle(t *testing.T, optionalSingle OptionalSingle, assertions ...RxAssertion) {
ass := parseAssertions(assertions...)

v, err := optionalSingle.Subscribe(nil).Block()
var v interface{}
var err error
optionalSingle.Subscribe(NewObserver(NextFunc(func(i interface{}) {
v = i
}), ErrFunc(func(e error) {
err = e
}))).Block()

if err != nil {
assert.Fail(t, "error while retrieving OptionalSingle results")
Expand Down
62 changes: 17 additions & 45 deletions observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,24 +722,22 @@ func TestElementAt(t *testing.T) {
got := 0
just := Just(0, 1, 2, 3, 4)
single := just.ElementAt(2)
_, err := single.Subscribe(NextFunc(func(i interface{}) {
single.Subscribe(NextFunc(func(i interface{}) {
switch i := i.(type) {
case int:
got = i
}
})).Block()
assert.NoError(t, err)
assert.Equal(t, 2, got)
}

func TestElementAtWithError(t *testing.T) {
got := 0
just := Just(0, 1, 2, 3, 4)
single := just.ElementAt(10)
_, err := single.Subscribe(ErrFunc(func(error) {
single.Subscribe(ErrFunc(func(error) {
got = 10
})).Block()
assert.Error(t, err)
assert.Equal(t, 10, got)
}

Expand All @@ -755,12 +753,9 @@ func TestObservableReduce(t *testing.T) {
}

var got Optional
_, err := stream1.Reduce(add).Subscribe(NextFunc(func(i interface{}) {
stream1.Reduce(add).Subscribe(NextFunc(func(i interface{}) {
got = i.(Optional)
})).Block()
if err != nil {
t.Fail()
}
assert.False(t, got.IsEmpty())
assert.Exactly(t, Of(14), got)
}
Expand All @@ -777,12 +772,9 @@ func TestObservableReduceEmpty(t *testing.T) {
stream := Empty()

var got Optional
_, err := stream.Reduce(add).Subscribe(NextFunc(func(i interface{}) {
stream.Reduce(add).Subscribe(NextFunc(func(i interface{}) {
got = i.(Optional)
})).Block()
if err != nil {
t.Fail()
}
assert.True(t, got.IsEmpty())
}

Expand All @@ -792,12 +784,9 @@ func TestObservableReduceNil(t *testing.T) {
return nil
}
var got Optional
_, err := stream.Reduce(nilReduce).Subscribe(NextFunc(func(i interface{}) {
stream.Reduce(nilReduce).Subscribe(NextFunc(func(i interface{}) {
got = i.(Optional)
})).Block()
if err != nil {
t.Fail()
}
assert.False(t, got.IsEmpty())
g, err := got.Get()
assert.Nil(t, err)
Expand All @@ -806,43 +795,28 @@ func TestObservableReduceNil(t *testing.T) {

func TestObservableCount(t *testing.T) {
stream := Just(1, 2, 3, "foo", "bar", errors.New("error"))
count, err := stream.Count().Subscribe(nil).Block()
if err != nil {
t.Fail()
}
assert.Exactly(t, int64(6), count)
obs := stream.Count()
AssertSingle(t, obs, HasValue(int64(6)))
}

func TestObservableFirstOrDefault(t *testing.T) {
v, err := Empty().FirstOrDefault(7).Subscribe(nil).Block()
if err != nil {
t.Fail()
}
assert.Exactly(t, 7, v)
obs := Empty().FirstOrDefault(7)
AssertSingle(t, obs, HasValue(7))
}

func TestObservableFirstOrDefaultWithValue(t *testing.T) {
v, err := Just(0, 1, 2).FirstOrDefault(7).Subscribe(nil).Block()
if err != nil {
t.Fail()
}
assert.Exactly(t, 0, v)
obs := Just(0, 1, 2).FirstOrDefault(7)
AssertSingle(t, obs, HasValue(0))
}

func TestObservableLastOrDefault(t *testing.T) {
v, err := Empty().LastOrDefault(7).Subscribe(nil).Block()
if err != nil {
t.Fail()
}
assert.Exactly(t, 7, v)
obs := Empty().LastOrDefault(7)
AssertSingle(t, obs, HasValue(7))
}

func TestObservableLastOrDefaultWithValue(t *testing.T) {
v, err := Just(0, 1, 3).LastOrDefault(7).Subscribe(nil).Block()
if err != nil {
t.Fail()
}
assert.Exactly(t, 3, v)
obs := Just(0, 1, 3).LastOrDefault(7)
AssertSingle(t, obs, HasValue(3))
}

func TestObservableSkipWhile(t *testing.T) {
Expand Down Expand Up @@ -1024,18 +998,16 @@ func TestContain(t *testing.T) {

var got1, got2 bool

_, err := Just(1, 2, 3).Contains(predicate).
Just(1, 2, 3).Contains(predicate).
Subscribe(NextFunc(func(i interface{}) {
got1 = i.(bool)
})).Block()
assert.NoError(t, err)
assert.True(t, got1)

_, err = Just(1, 5, 3).Contains(predicate).
Just(1, 5, 3).Contains(predicate).
Subscribe(NextFunc(func(i interface{}) {
got2 = i.(bool)
})).Block()
assert.NoError(t, err)
assert.False(t, got2)
}

Expand Down
40 changes: 18 additions & 22 deletions single.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ type Single interface {
Iterable
Filter(apply Predicate) OptionalSingle
Map(apply Function) Single
Subscribe(handler EventHandler, opts ...Option) SingleObserver
Subscribe(handler EventHandler, opts ...Option) Observer
}

type OptionalSingle interface {
Subscribe(handler EventHandler, opts ...Option) SingleObserver
Subscribe(handler EventHandler, opts ...Option) Observer
}

type single struct {
Expand Down Expand Up @@ -46,8 +46,8 @@ func newOptionalSingleFrom(opt Optional) OptionalSingle {
}

// CheckSingleEventHandler checks the underlying type of an EventHandler.
func CheckSingleEventHandler(handler EventHandler) SingleObserver {
return NewSingleObserver(handler)
func CheckSingleEventHandler(handler EventHandler) Observer {
return NewObserver(handler)
}

func newColdSingle(f func(chan interface{})) Single {
Expand Down Expand Up @@ -107,44 +107,40 @@ func (s *single) Map(apply Function) Single {
return newColdSingle(f)
}

func (s *single) Subscribe(handler EventHandler, opts ...Option) SingleObserver {
func (s *single) Subscribe(handler EventHandler, opts ...Option) Observer {
ob := CheckSingleEventHandler(handler)

go func() {
it := s.iterable.Iterator(context.Background())
for {
if item, err := it.Next(context.Background()); err == nil {
switch item := item.(type) {
case error:
ob.OnError(item)

// Record the error and break the loop.
return
default:
ob.OnSuccess(item)
}
} else {
break
if item, err := it.Next(context.Background()); err == nil {
switch item := item.(type) {
case error:
ob.OnError(item)
default:
ob.OnNext(item)
ob.Dispose()
}
} else {
ob.OnDone()
}
}()

return ob
}

func (s *optionalSingle) Subscribe(handler EventHandler, opts ...Option) SingleObserver {
func (s *optionalSingle) Subscribe(handler EventHandler, opts ...Option) Observer {
ob := CheckSingleEventHandler(handler)

// TODO Improve
go func() {
for item := range s.ch {
switch item := item.(type) {
case error:
ob.OnError(item)

// Record the error and break the loop.
return
default:
ob.OnSuccess(item)
ob.OnNext(item)
ob.Dispose()
}
}
}()
Expand Down
9 changes: 3 additions & 6 deletions single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func TestSingleFilterNotMatching(t *testing.T) {
got := 0

_, err := Just(1, 2, 3).ElementAt(1).Filter(func(i interface{}) bool {
Just(1, 2, 3).ElementAt(1).Filter(func(i interface{}) bool {
switch i := i.(type) {
case int:
if i == 2 {
Expand All @@ -26,14 +26,13 @@ func TestSingleFilterNotMatching(t *testing.T) {
}
}
})).Block()
assert.NoError(t, err)
assert.Equal(t, 2, got)
}

func TestSingleFilterMatching(t *testing.T) {
got := 0

_, err := Just(1, 2, 3).ElementAt(1).Filter(func(i interface{}) bool {
Just(1, 2, 3).ElementAt(1).Filter(func(i interface{}) bool {
switch i := i.(type) {
case int:
if i == 2 {
Expand All @@ -50,19 +49,17 @@ func TestSingleFilterMatching(t *testing.T) {
}
}
})).Block()
assert.NoError(t, err)
assert.Equal(t, 0, got)
}

func TestSingleMap(t *testing.T) {
got := 0

_, err := Just(1, 2, 3).ElementAt(1).Map(func(i interface{}) interface{} {
Just(1, 2, 3).ElementAt(1).Map(func(i interface{}) interface{} {
return i
}).Subscribe(NextFunc(func(i interface{}) {
got = i.(int) + 10
})).Block()
assert.NoError(t, err)
assert.Equal(t, 12, got)
}

Expand Down
Loading

0 comments on commit ade4bd6

Please sign in to comment.