diff --git a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java index 4df115b7ae..bbf2bcc48b 100644 --- a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java +++ b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java @@ -166,6 +166,7 @@ void drain() { final AtomicLong requested = sharedProducer; final Subscriber actualSubscriber = this.actual; + final NotificationLite nl = NotificationLite.instance(); for (;;) { @@ -200,13 +201,13 @@ void drain() { long emittedAmount = 0L; boolean unbounded = requestedAmount == Long.MAX_VALUE; - Queue innerQueue = innerSubscriber.queue; + Queue innerQueue = innerSubscriber.queue; boolean innerDone = false; for (;;) { outerDone = innerSubscriber.done; - R v = innerQueue.peek(); + Object v = innerQueue.peek(); empty = v == null; if (outerDone) { @@ -237,7 +238,7 @@ void drain() { innerQueue.poll(); try { - actualSubscriber.onNext(v); + actualSubscriber.onNext(nl.getValue(v)); } catch (Throwable ex) { Exceptions.throwOrReport(ex, actualSubscriber, v); return; @@ -271,7 +272,8 @@ void drain() { static final class EagerInnerSubscriber extends Subscriber { final EagerOuterSubscriber parent; - final Queue queue; + final Queue queue; + final NotificationLite nl; volatile boolean done; Throwable error; @@ -279,19 +281,20 @@ static final class EagerInnerSubscriber extends Subscriber { public EagerInnerSubscriber(EagerOuterSubscriber parent, int bufferSize) { super(); this.parent = parent; - Queue q; + Queue q; if (UnsafeAccess.isUnsafeAvailable()) { - q = new SpscArrayQueue(bufferSize); + q = new SpscArrayQueue(bufferSize); } else { - q = new SpscAtomicArrayQueue(bufferSize); + q = new SpscAtomicArrayQueue(bufferSize); } this.queue = q; + this.nl = NotificationLite.instance(); request(bufferSize); } @Override public void onNext(T t) { - queue.offer(t); + queue.offer(nl.next(t)); parent.drain(); } diff --git a/src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java b/src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java index 8c7bd3d9e4..8d2d40bed4 100644 --- a/src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java @@ -394,4 +394,20 @@ public void call(Integer t) { ts.assertNotCompleted(); Assert.assertEquals(RxRingBuffer.SIZE, count.get()); } + + @Test + public void testInnerNull() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.just(1).concatMapEager(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(null); + } + }).subscribe(ts); + + ts.assertNoErrors(); + ts.assertCompleted(); + ts.assertValue(null); + } }