Skip to content

Commit

Permalink
Tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
jochasinga committed Dec 28, 2016
1 parent f439f65 commit 08ed395
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
10 changes: 5 additions & 5 deletions single/single.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package single

import (
"time"

"github.com/jochasinga/grx/bang"
"github.com/jochasinga/grx/bases"
"github.com/jochasinga/grx/errors"
Expand Down Expand Up @@ -67,6 +65,8 @@ func (s *Single) Subscribe(handler bases.EventHandler) (bases.Subscriptor, error
nextf = handler
case handlers.ErrFunc:
errf = handler
case handlers.DoneFunc:
return nil, NewError(errors.HandlerFuncError)
case *observer.Observer:
ob = handler
isObserver = true
Expand All @@ -85,9 +85,7 @@ func (s *Single) Subscribe(handler bases.EventHandler) (bases.Subscriptor, error
}
}()

return bases.Subscriptor(&subscription.Subscription{
SubscribeAt: time.Now(),
}), nil
return s.subscriptor.Subscribe(), nil
}

func (s *Single) Unsubscribe() bases.Subscriptor {
Expand All @@ -99,6 +97,8 @@ func (s *Single) Unsubscribe() bases.Subscriptor {
func New(e bases.Emitter) *Single {
s := &Single{
EventStream: make(eventstream.EventStream),
subscriptor: bases.Subscriptor(subscription.DefaultSubscription),
notifier: bang.New(),
}
go func() {
s.EventStream <- e
Expand Down
28 changes: 24 additions & 4 deletions single/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestSingleImplementStream(t *testing.T) {
}

func TestCreateSingleWithConstructor(t *testing.T) {
s := New(Number(1))

assert := assert.New(t)

s := New(Number(1))

emitter, err := s.Next()
assert.Nil(err)
assert.NotNil(emitter)
Expand All @@ -48,6 +48,10 @@ func TestCreateSingleWithConstructor(t *testing.T) {
emitter, err = s.Next()
assert.Nil(emitter)
assert.NotNil(err)
assert.Panics(func() {
assert.Implements((*Emitter)(nil), emitter)
})
assert.EqualValues(nil, emitter)
}

func TestSubscribingToObserver(t *testing.T) {
Expand All @@ -67,7 +71,7 @@ func TestSubscribingToObserver(t *testing.T) {
sub, err := s1.Subscribe(ob)
<-time.After(10 * time.Millisecond)
if s, ok := sub.(*subscription.Subscription); ok {
assert.WithinDuration(s.SubscribeAt, time.Now(), 20*time.Millisecond)
assert.WithinDuration(s.SubscribeAt(), time.Now(), 20*time.Millisecond)
}
assert.Nil(err)
assert.Implements((*Subscriptor)(nil), sub)
Expand Down Expand Up @@ -97,7 +101,7 @@ func TestSubscribingToHandlers(t *testing.T) {
sub, err := s1.Subscribe(nextf)
<-time.After(10 * time.Millisecond)
if s, ok := sub.(*subscription.Subscription); ok {
assert.WithinDuration(s.SubscribeAt, time.Now(), 20*time.Millisecond)
assert.WithinDuration(s.SubscribeAt(), time.Now(), 20*time.Millisecond)
}
assert.Nil(err)
assert.Implements((*Subscriptor)(nil), sub)
Expand All @@ -110,3 +114,19 @@ func TestSubscribingToHandlers(t *testing.T) {
assert.Implements((*Subscriptor)(nil), sub)
assert.Equal("text error", errorMessage)
}

func TestSubscribeAndUnsubscribeReturnSameSubscriptor(t *testing.T) {
assert := assert.New(t)
s1 := DefaultSingle

sampleHandler := handlers.NextFunc(func(item Item) {
return
})

sub1, _ := s1.Subscribe(sampleHandler)
<-time.After(10 * time.Millisecond)
sub2 := s1.Unsubscribe()

assert.Equal(sub1.SubscribeAt(), sub2.SubscribeAt())
assert.Equal(sub1.UnsubscribeAt(), sub2.UnsubscribeAt())
}

0 comments on commit 08ed395

Please sign in to comment.