Skip to content

Commit

Permalink
TakeWhile: don't unsubscribe downstream.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 11, 2015
1 parent 0abfb74 commit a128c5e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 16 deletions.
17 changes: 14 additions & 3 deletions src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,24 @@ public abstract class Subscriber<T> implements Observer<T>, Subscription {
private long requested = Long.MIN_VALUE; // default to not set

protected Subscriber() {
this.op = null;
this.cs = new SubscriptionList();
this(null, false);
}

protected Subscriber(Subscriber<?> op) {
this(op, true);
}
/**
* Construct a subscriber by using the other subscriber for backpressure
* and optionally sharing the underlying subscriptions list.
* <p>To retain the chaining of subscribers, the caller should add the
* created instance to the op via {@code add()}.
*
* @param op the other subscriber
* @param shareSubscriptions should the subscription list in op shared with this instance?
*/
protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
this.op = op;
this.cs = op.cs;
this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList();
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/rx/internal/operators/OperatorTakeWhile.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public OperatorTakeWhile(Func2<? super T, ? super Integer, Boolean> predicate) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
Subscriber<T> s = new Subscriber<T>(subscriber, false) {

private int counter = 0;

Expand Down Expand Up @@ -86,6 +86,8 @@ public void onError(Throwable e) {
}

};
subscriber.add(s);
return s;
}

}
63 changes: 51 additions & 12 deletions src/test/java/rx/internal/operators/OperatorTakeWhileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@

import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import org.junit.Test;
import java.util.Arrays;

import rx.Observable;
import org.junit.*;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.observers.TestSubscriber;
import rx.subjects.*;

public class OperatorTakeWhileTest {

Expand Down Expand Up @@ -222,4 +217,48 @@ public void run() {
System.out.println("done starting TestObservable thread");
}
}

@Test
public void testBackpressure() {
Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 < 100;
}
});
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onStart() {
request(5);
}
};

source.subscribe(ts);

ts.assertNoErrors();
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));

ts.requestMore(5);

ts.assertNoErrors();
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}

@Test
public void testNoUnsubscribeDownstream() {
Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 < 2;
}
});
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.unsafeSubscribe(ts);

ts.assertNoErrors();
ts.assertReceivedOnNext(Arrays.asList(1));

Assert.assertFalse("Unsubscribed!", ts.isUnsubscribed());
}
}

0 comments on commit a128c5e

Please sign in to comment.