Skip to content

Commit

Permalink
Fix OperationalConditional
Browse files Browse the repository at this point in the history
All unit tests now pass.
  • Loading branch information
benjchristensen committed Jan 26, 2014
1 parent 9ee0fdc commit 6725754
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;
Expand Down Expand Up @@ -231,7 +232,7 @@ public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
}
if (first) {
SerialSubscription ssub = new SerialSubscription();
MultipleAssignmentSubscription ssub = new MultipleAssignmentSubscription();

ssub.set(source.subscribe(new SourceObserver(t1, ssub)));

Expand All @@ -244,10 +245,10 @@ public Subscription onSubscribe(Observer<? super T> t1) {

/** Observe the source. */
final class SourceObserver extends Observer<T> {
final SerialSubscription cancel;
final MultipleAssignmentSubscription cancel;
final Observer<? super T> observer;

public SourceObserver(Observer<? super T> observer, SerialSubscription cancel) {
public SourceObserver(Observer<? super T> observer, MultipleAssignmentSubscription cancel) {
this.observer = observer;
this.cancel = cancel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ public Boolean call() {

@Test
public void testDoWhileManyTimes() {
fail("deadlocking");
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());

List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
Expand Down

0 comments on commit 6725754

Please sign in to comment.