Skip to content

Commit

Permalink
Golint
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Jul 30, 2019
1 parent 43cf1e7 commit e5362ec
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 160 deletions.
3 changes: 1 addition & 2 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func AssertObservable(t *testing.T, observable Observable, assertions ...RxAsser
}, func(e error) {
err = e
}, nil).Block()

assertObservable(t, ass, got, err)
}

Expand All @@ -213,6 +212,7 @@ func AssertObservableEventually(t *testing.T, observable Observable, timeout tim
chErr <- e
}, nil)
ctxTimeout, ctxTimeoutF := context.WithTimeout(context.Background(), timeout)
defer ctxTimeoutF()

mainLoop:
for {
Expand All @@ -221,7 +221,6 @@ mainLoop:
if open {
got = append(got, item)
} else {
ctxTimeoutF()
break mainLoop
}
case e := <-chErr:
Expand Down
7 changes: 7 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package rxgo

import (
"time"
)

const wait = 30 * time.Millisecond
3 changes: 3 additions & 0 deletions flatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa
for {
if item, err := it.Next(context.Background()); err == nil {
sequence := apply(item)
// TODO Error handling
sem.Acquire(ctx, 1)
go func() {
defer sem.Release(1)
// TODO Error handling
sequence.Subscribe(emissionObserver).Block()
}()
} else {
break
}
}

// TODO Error handling
sem.Acquire(ctx, int64(maxInParallel))
}

Expand Down
16 changes: 8 additions & 8 deletions flatmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rxgo
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/mock"
)

Expand All @@ -17,7 +19,8 @@ func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) {
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
err := sequence.Subscribe(emissionObserver.Capture()).Block()
assert.NoError(t, err)

// then completes without any emission
emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything)
Expand All @@ -37,7 +40,8 @@ func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) {
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
err := sequence.Subscribe(emissionObserver.Capture()).Block()
assert.NoError(t, err)

// then completes with emission of the same element
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
Expand All @@ -60,7 +64,8 @@ func TestFlatMapReturnsSliceElements(t *testing.T) {
sequence = sequence.FlatMap(flattenThreeElementSlice, 1)

// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
err := sequence.Subscribe(emissionObserver.Capture()).Block()
assert.NoError(t, err)

// then completes with emission of flatten elements
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
Expand Down Expand Up @@ -96,11 +101,6 @@ func TestFlatMapReturnsSliceElements(t *testing.T) {
// assert.Equal(t, uint(1), requestedMaxInParallel)
//}

var (
someElement = "some element"
someSequence = Just(someElement)
)

func identity(el interface{}) Observable {
return Just(el)
}
Expand Down
15 changes: 8 additions & 7 deletions mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package rxgo
import (
"bufio"
"context"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
)

const signalCh = byte(0)
Expand Down Expand Up @@ -40,20 +41,20 @@ type mockType struct {
index int
}

func (m mockContext) Deadline() (deadline time.Time, ok bool) {
func (m *mockContext) Deadline() (deadline time.Time, ok bool) {
panic("implement me")
}

func (m mockContext) Done() <-chan struct{} {
func (m *mockContext) Done() <-chan struct{} {
outputs := m.Called()
return outputs.Get(0).(chan struct{})
}

func (m mockContext) Err() error {
func (m *mockContext) Err() error {
panic("implement me")
}

func (m mockContext) Value(key interface{}) interface{} {
func (m *mockContext) Value(key interface{}) interface{} {
panic("implement me")
}

Expand Down Expand Up @@ -184,7 +185,7 @@ func causality(in string) ([]Observable, []context.Context) {
t := tasks[i]
index := t.index

if t.context == true {
if t.context {
ctx := contexts[index]
notif := make(chan struct{}, 1)
lastObservableType = -1
Expand Down
4 changes: 2 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (o *observable) BufferWithCount(count, skip int) Observable {
return
}

buffer := make([]interface{}, count, count)
buffer := make([]interface{}, count)
iCount := 0
iSkip := 0
it := o.iterable.Iterator(context.Background())
Expand All @@ -472,7 +472,7 @@ func (o *observable) BufferWithCount(count, skip int) Observable {

if iSkip == skip { // Send current buffer
out <- buffer
buffer = make([]interface{}, count, count)
buffer = make([]interface{}, count)
iCount = 0
iSkip = 0
}
Expand Down
Loading

0 comments on commit e5362ec

Please sign in to comment.