Skip to content

Commit

Permalink
Merge pull request ReactiveX#3645 from akarnokd/AmbStateFix1x
Browse files Browse the repository at this point in the history
1.x: fix Amb sharing the choice among all subscribers
  • Loading branch information
akarnokd committed Jan 25, 2016
2 parents 4e31a5f + 0f8caf3 commit 5ab00f9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/operators/OnSubscribeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,15 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
//give default access instead of private as a micro-optimization
//for access from anonymous classes below
final Iterable<? extends Observable<? extends T>> sources;
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;

private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
this.sources = sources;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;

//setup unsubscription of all the subscribers to the sources
subscriber.add(Subscriptions.create(new Action0() {
Expand Down
23 changes: 22 additions & 1 deletion src/test/java/rx/internal/operators/OnSubscribeAmbTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,5 +288,26 @@ public void call(Object t) {
}).ambWith(Observable.just(2)).toBlocking().single();
assertEquals(1, result);
}


@Test(timeout = 1000)
public void testMultipleUse() {
TestSubscriber<Long> ts1 = new TestSubscriber<Long>();
TestSubscriber<Long> ts2 = new TestSubscriber<Long>();

Observable<Long> amb = Observable.timer(100, TimeUnit.MILLISECONDS).ambWith(Observable.timer(200, TimeUnit.MILLISECONDS));

amb.subscribe(ts1);
amb.subscribe(ts2);

ts1.awaitTerminalEvent();
ts2.awaitTerminalEvent();

ts1.assertValue(0L);
ts1.assertCompleted();
ts1.assertNoErrors();

ts2.assertValue(0L);
ts2.assertCompleted();
ts2.assertNoErrors();
}
}

0 comments on commit 5ab00f9

Please sign in to comment.