Skip to content

Commit

Permalink
OperatorAny needs to handle backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrjacobs committed Jul 22, 2014
1 parent 044cd53 commit 70d0308
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
return new Subscriber<T>(child) {
boolean hasElements;
boolean done;

@Override
public void onNext(T t) {
hasElements = true;
Expand All @@ -48,6 +49,9 @@ public void onNext(T t) {
child.onNext(!returnOnEmpty);
child.onCompleted();
unsubscribe();
} else {
// if we drop values we must replace them upstream as downstream won't receive and request more
request(1);
}
}

Expand All @@ -68,7 +72,7 @@ public void onCompleted() {
child.onCompleted();
}
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -27,6 +28,8 @@
import rx.functions.Func1;
import rx.functions.Functions;

import java.util.Arrays;

public class OperatorAnyTest {

@Test
Expand Down Expand Up @@ -197,4 +200,16 @@ public Boolean call(Integer t1) {
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testWithFollowingFirst() {
Observable<Integer> o = Observable.from(Arrays.asList(1, 3, 5, 6));
Observable<Boolean> anyEven = o.exists(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i % 2 == 0;
}
});
assertTrue(anyEven.toBlocking().first());
}
}

0 comments on commit 70d0308

Please sign in to comment.