Skip to content

Commit

Permalink
Merge pull request ReactiveX#888 from benjchristensen/observeOn-bound…
Browse files Browse the repository at this point in the history
…ed-unbounded

Revert Bounded ObserveOn
  • Loading branch information
benjchristensen committed Feb 17, 2014
2 parents b8b28e4 + ac4aed1 commit 18f545a
Show file tree
Hide file tree
Showing 6 changed files with 853 additions and 331 deletions.
22 changes: 2 additions & 20 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperatorObserveOnBounded;
import rx.operators.OperatorScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
Expand Down Expand Up @@ -5148,7 +5149,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
}

/**
* Move notifications to the specified {@link Scheduler} one {@code onNext} at a time.
* Move notifications to the specified {@link Scheduler} asynchronously with an unbounded buffer.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
Expand All @@ -5162,25 +5163,6 @@ public final Observable<T> observeOn(Scheduler scheduler) {
return lift(new OperatorObserveOn<T>(scheduler));
}

/**
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of a specified size.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
* <p>
* If the buffer fills to its maximum size...
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize
* that will be rounded up to the next power of 2
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
}

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* <p>
Expand Down
231 changes: 28 additions & 203 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,31 @@
*/
package rx.operators;

import java.util.concurrent.Semaphore;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TestScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Delivers events on the specified Scheduler.
* <p>
* This provides backpressure by blocking the incoming onNext when there is already one in the queue.
* <p>
* This means that at any given time the max number of "onNext" in flight is 3:
* -> 1 being delivered on the Scheduler
* -> 1 in the queue waiting for the Scheduler
* -> 1 blocking on the queue waiting to deliver it
*
* I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler
* can loop and have something to do each time around to optimize for avoiding rescheduling when it
* can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop
* thus if the queue is empty it exits and next time something is added it will reschedule.
* Delivers events on the specified Scheduler asynchronously via an unbounded buffer.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/observeOn.png">
*/
public class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final int bufferSize;

/**
*
* @param scheduler
* @param bufferSize
* that will be rounded up to the next power of 2
*/
public OperatorObserveOn(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize);
}

public OperatorObserveOn(Scheduler scheduler) {
this(scheduler, 1);
}

private static int roundToNextPowerOfTwoIfNecessary(int num) {
if ((num & -num) == num) {
return num;
} else {
int result = 1;
while (num != 0)
{
num >>= 1;
result <<= 1;
}
return result;
}
this.scheduler = scheduler;
}

@Override
Expand All @@ -88,19 +50,19 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TestScheduler) {
// this one will deadlock as it is single-threaded and won't run the scheduled
// work until it manually advances, which it won't be able to do as it will block
return child;
} else {
return new ObserveOnSubscriber(child);
}
}

private static Object NULL_SENTINEL = new Object();
private static Object COMPLETE_SENTINEL = new Object();
private static class Sentinel {

private static class ErrorSentinel {
}

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
Expand All @@ -113,7 +75,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
final Subscriber<? super T> observer;
private volatile Scheduler.Inner recursiveScheduler;

private final InterruptibleBlockingQueue<Object> queue = new InterruptibleBlockingQueue<Object>(bufferSize);
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
final AtomicLong counter = new AtomicLong(0);

public ObserveOnSubscriber(Subscriber<? super T> observer) {
Expand All @@ -123,62 +85,29 @@ public ObserveOnSubscriber(Subscriber<? super T> observer) {

@Override
public void onNext(final T t) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
if (t == null) {
queue.addBlocking(NULL_SENTINEL);
} else {
queue.addBlocking(t);
}
schedule();
} catch (InterruptedException e) {
if (!isUnsubscribed()) {
onError(e);
}
if (t == null) {
queue.offer(NULL_SENTINEL);
} else {
queue.offer(t);
}
schedule();
}

@Override
public void onCompleted() {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.addBlocking(COMPLETE_SENTINEL);
schedule();
} catch (InterruptedException e) {
onError(e);
}
queue.offer(COMPLETE_SENTINEL);
schedule();
}

@Override
public void onError(final Throwable e) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.addBlocking(new ErrorSentinel(e));
schedule();
} catch (InterruptedException e2) {
// call directly if we can't schedule
observer.onError(e2);
}
queue.offer(new ErrorSentinel(e));
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
// first time through, register a Subscription
// that can interrupt this thread
add(Subscriptions.create(new Action0() {

@Override
public void call() {
// we have to interrupt the parent thread because
// it can be blocked on queue.put
queue.interrupt();
}

}));
add(scheduler.schedule(new Action1<Inner>() {

@Override
Expand Down Expand Up @@ -206,12 +135,14 @@ private void pollQueue() {
do {
Object v = queue.poll();
if (v != null) {
if (v == NULL_SENTINEL) {
observer.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
observer.onCompleted();
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
observer.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
observer.onCompleted();
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
}
} else {
observer.onNext((T) v);
}
Expand All @@ -221,110 +152,4 @@ private void pollQueue() {

}

/**
* Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
*
* This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
* unsubscribe behavior when this queue is being used.
*
* @param <E>
*/
private static class InterruptibleBlockingQueue<E> {

private final Semaphore semaphore;
private volatile boolean interrupted = false;

private final E[] buffer;

private AtomicLong tail = new AtomicLong();
private AtomicLong head = new AtomicLong();
private final int capacity;
private final int mask;

@SuppressWarnings("unchecked")
public InterruptibleBlockingQueue(final int size) {
this.semaphore = new Semaphore(size);
this.capacity = size;
this.mask = size - 1;
buffer = (E[]) new Object[size];
}

/**
* Used to unsubscribe and interrupt the producer if blocked in put()
*/
public void interrupt() {
interrupted = true;
semaphore.release();
}

public void addBlocking(final E e) throws InterruptedException {
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
semaphore.acquire();
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
if (e == null) {
throw new IllegalArgumentException("Can not put null");
}

if (offer(e)) {
return;
} else {
throw new IllegalStateException("Queue is full");
}
}

private boolean offer(final E e) {
final long _t = tail.get();
if (_t - head.get() == capacity) {
// queue is full
return false;
}
int index = (int) (_t & mask);
buffer[index] = e;
// move the tail forward
tail.lazySet(_t + 1);

return true;
}

public E poll() {
if (interrupted) {
return null;
}
final long _h = head.get();
if (tail.get() == _h) {
// nothing available
return null;
}
int index = (int) (_h & mask);

// fetch the item
E v = buffer[index];
// allow GC to happen
buffer[index] = null;
// increment and signal we're done
head.lazySet(_h + 1);
if (v != null) {
semaphore.release();
}
return v;
}

public int size()
{
int size;
do
{
final long currentHead = head.get();
final long currentTail = tail.get();
size = (int) (currentTail - currentHead);
} while (size > buffer.length);

return size;
}

}
}
Loading

0 comments on commit 18f545a

Please sign in to comment.