Skip to content

Commit

Permalink
Merge pull request ReactiveX#3630 from akarnokd/ConcatMapEagerInnerNP…
Browse files Browse the repository at this point in the history
…EFix1x

1.x: ConcatMapEager allow nulls from inner Observables.
  • Loading branch information
zsxwing committed Jan 22, 2016
2 parents ef1c509 + cef0b91 commit 90df5da
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
19 changes: 11 additions & 8 deletions src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ void drain() {

final AtomicLong requested = sharedProducer;
final Subscriber<? super R> actualSubscriber = this.actual;
final NotificationLite<R> nl = NotificationLite.instance();

for (;;) {

Expand Down Expand Up @@ -200,13 +201,13 @@ void drain() {
long emittedAmount = 0L;
boolean unbounded = requestedAmount == Long.MAX_VALUE;

Queue<R> innerQueue = innerSubscriber.queue;
Queue<Object> innerQueue = innerSubscriber.queue;
boolean innerDone = false;


for (;;) {
outerDone = innerSubscriber.done;
R v = innerQueue.peek();
Object v = innerQueue.peek();
empty = v == null;

if (outerDone) {
Expand Down Expand Up @@ -237,7 +238,7 @@ void drain() {
innerQueue.poll();

try {
actualSubscriber.onNext(v);
actualSubscriber.onNext(nl.getValue(v));
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, actualSubscriber, v);
return;
Expand Down Expand Up @@ -271,27 +272,29 @@ void drain() {

static final class EagerInnerSubscriber<T> extends Subscriber<T> {
final EagerOuterSubscriber<?, T> parent;
final Queue<T> queue;
final Queue<Object> queue;
final NotificationLite<T> nl;

volatile boolean done;
Throwable error;

public EagerInnerSubscriber(EagerOuterSubscriber<?, T> parent, int bufferSize) {
super();
this.parent = parent;
Queue<T> q;
Queue<Object> q;
if (UnsafeAccess.isUnsafeAvailable()) {
q = new SpscArrayQueue<T>(bufferSize);
q = new SpscArrayQueue<Object>(bufferSize);
} else {
q = new SpscAtomicArrayQueue<T>(bufferSize);
q = new SpscAtomicArrayQueue<Object>(bufferSize);
}
this.queue = q;
this.nl = NotificationLite.instance();
request(bufferSize);
}

@Override
public void onNext(T t) {
queue.offer(t);
queue.offer(nl.next(t));
parent.drain();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,20 @@ public void call(Integer t) {
ts.assertNotCompleted();
Assert.assertEquals(RxRingBuffer.SIZE, count.get());
}

@Test
public void testInnerNull() {
TestSubscriber<Object> ts = TestSubscriber.create();

Observable.just(1).concatMapEager(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(null);
}
}).subscribe(ts);

ts.assertNoErrors();
ts.assertCompleted();
ts.assertValue(null);
}
}

0 comments on commit 90df5da

Please sign in to comment.