Skip to content

Commit

Permalink
Refine serialize option
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 25, 2020
1 parent 59586b0 commit da054c7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 75 deletions.
3 changes: 3 additions & 0 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ loop:
if expectedError == nil {
assert.Equal(t, 0, len(errs))
} else {
if len(errs) == 0 {
assert.FailNow(t, "no error raised", "expected %v", expectedError)
}
assert.Equal(t, expectedError, errs[0])
}
}
Expand Down
151 changes: 77 additions & 74 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
}

if serialized, f := option.isSerialized(); serialized {
firstItemIDCh := make(chan int, 1)
fromCh := make(chan int, 1)
firstItemIDCh := make(chan Item, 1)
fromCh := make(chan Item, 1)
obs := &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
Expand All @@ -170,7 +170,11 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
case <-ctx.Done():
return
case firstItemID := <-firstItemIDCh:
fromCh <- firstItemID
if firstItemID.Error() {
firstItemID.SendContext(ctx, fromCh)
return
}
Of(firstItemID.V.(int)).SendContext(ctx, fromCh)
runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...)
}
}()
Expand Down Expand Up @@ -378,7 +382,7 @@ func runParallel(ctx context.Context, next chan Item, observe <-chan Item, opera
}()
}

func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan int, observe <-chan Item, next chan Item, operatorFactory func() operator, bypassGather bool, option Option, opts ...Option) {
func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item, observe <-chan Item, next chan Item, operatorFactory func() operator, bypassGather bool, option Option, opts ...Option) {
go func() {
op := operatorFactory()
stopped := false
Expand All @@ -404,9 +408,10 @@ func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan int,
}
if i.Error() {
op.err(ctx, i, next, operator)
i.SendContext(ctx, notif)
} else {
op.next(ctx, i, next, operator)
notif <- f(i.V)
Of(f(i.V)).SendContext(ctx, notif)
}
}
}
Expand All @@ -415,7 +420,7 @@ func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan int,
}()
}

func (o *ObservableImpl) serialize(fromCh chan int, identifier func(interface{}) int, opts ...Option) Observable {
func (o *ObservableImpl) serialize(fromCh chan Item, identifier func(interface{}) int, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()

Expand All @@ -426,94 +431,92 @@ func (o *ObservableImpl) serialize(fromCh chan int, identifier func(interface{})
})
status := make(map[int]interface{})
notif := make(chan struct{})
ready := make(chan struct{})

var from int
var counter int64
src := o.Observe(opts...)
go func() {
defer close(ready)
select {
case <-ctx.Done():
close(next)
return
case from := <-fromCh:
case item := <-fromCh:
if item.Error() {
item.SendContext(ctx, next)
close(next)
return
}
from = item.V.(int)
minHeap.Push(from)
counter = int64(from)
}
}()

// Scatter
go func() {
defer close(notif)

// Wait for the from identifier to be set
select {
case <-ctx.Done():
return
case <-ready:
}
// Scatter
go func() {
defer close(notif)

for {
select {
case <-ctx.Done():
return
case item, ok := <-src:
if !ok {
return
}
if item.Error() {
next <- item
return
}
for {
select {
case <-ctx.Done():
return
case item, ok := <-src:
if !ok {
return
}
if item.Error() {
next <- item
return
}

id := identifier(item.V)
mutex.Lock()
if id != from {
minHeap.Push(id)
}
status[id] = item.V
mutex.Unlock()
select {
case <-ctx.Done():
return
case notif <- struct{}{}:
id := identifier(item.V)
mutex.Lock()
if id != from {
minHeap.Push(id)
}
status[id] = item.V
mutex.Unlock()
select {
case <-ctx.Done():
return
case notif <- struct{}{}:
}
}
}
}
}
}()
}()

// Gather
go func() {
defer close(next)
// Gather
go func() {
defer close(next)

for {
select {
case <-ctx.Done():
return
case _, ok := <-notif:
if !ok {
return
}
for {
select {
case <-ctx.Done():
return
case _, ok := <-notif:
if !ok {
return
}

mutex.Lock()
for !minHeap.Empty() {
v, _ := minHeap.Peek()
id := v.(int)
if atomic.LoadInt64(&counter) == int64(id) {
if itemValue, contains := status[id]; contains {
minHeap.Pop()
delete(status, id)
mutex.Unlock()
Of(itemValue).SendContext(ctx, next)
mutex.Lock()
atomic.AddInt64(&counter, 1)
continue
mutex.Lock()
for !minHeap.Empty() {
v, _ := minHeap.Peek()
id := v.(int)
if atomic.LoadInt64(&counter) == int64(id) {
if itemValue, contains := status[id]; contains {
minHeap.Pop()
delete(status, id)
mutex.Unlock()
Of(itemValue).SendContext(ctx, next)
mutex.Lock()
atomic.AddInt64(&counter, 1)
continue
}
}
break
}
mutex.Unlock()
}
break
}
mutex.Unlock()
}
}()
}
}()

Expand Down
11 changes: 10 additions & 1 deletion observable_operator_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Test_Observable_Option_Serialize(t *testing.T) {
idx := 0
<-Range(0, 10000).Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
}, WithBufferedChannel(1), WithCPUPool(), Serialize(func(i interface{}) int {
}, WithBufferedChannel(10), WithCPUPool(), Serialize(func(i interface{}) int {
return i.(int)
})).DoOnNext(func(i interface{}) {
v := i.(int)
Expand All @@ -76,3 +76,12 @@ func Test_Observable_Option_Serialize(t *testing.T) {
idx++
})
}

func Test_Observable_Option_Error(t *testing.T) {
obs := testObservable(errFoo, 2, 3, 4).Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
}, WithBufferedChannel(10), WithCPUPool(), Serialize(func(i interface{}) int {
return i.(int)
}))
Assert(context.Background(), t, obs, IsEmpty(), HasError(errFoo))
}

0 comments on commit da054c7

Please sign in to comment.