Skip to content

Commit

Permalink
Merge pull request ReactiveX#2804 from akarnokd/Perf0225
Browse files Browse the repository at this point in the history
ObserveOn throughput enhancements
  • Loading branch information
benjchristensen committed Mar 6, 2015
2 parents 8758fdd + 94b53d6 commit ecbd27d
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 80 deletions.
102 changes: 57 additions & 45 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.*;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.RxRingBuffer;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.internal.util.*;
import rx.internal.util.unsafe.*;
import rx.schedulers.*;

/**
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
Expand Down Expand Up @@ -64,16 +61,15 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> child;
private final Scheduler.Worker recursiveScheduler;
private final ScheduledUnsubscribe scheduledUnsubscribe;
final Scheduler.Worker recursiveScheduler;
final ScheduledUnsubscribe scheduledUnsubscribe;
final NotificationLite<T> on = NotificationLite.instance();

private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
private boolean completed = false;
private boolean failure = false;
final Queue<Object> queue;
volatile boolean completed = false;
volatile boolean failure = false;

@SuppressWarnings("unused")
private volatile long requested = 0;
volatile long requested = 0;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");

Expand All @@ -82,12 +78,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");

volatile Throwable error;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
child.add(scheduledUnsubscribe);
child.setProducer(new Producer() {

Expand All @@ -113,10 +116,8 @@ public void onNext(final T t) {
if (isUnsubscribed() || completed) {
return;
}
try {
queue.onNext(t);
} catch (MissingBackpressureException e) {
onError(e);
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
Expand All @@ -127,8 +128,10 @@ public void onCompleted() {
if (isUnsubscribed() || completed) {
return;
}
if (error != null) {
return;
}
completed = true;
queue.onCompleted();
schedule();
}

Expand All @@ -137,53 +140,64 @@ public void onError(final Throwable e) {
if (isUnsubscribed() || completed) {
return;
}
if (error != null) {
return;
}
error = e;
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
unsubscribe();
completed = true;
// mark failure so the polling thread will skip onNext still in the queue
completed = true;
failure = true;
queue.onError(e);
schedule();
}

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
recursiveScheduler.schedule(new Action0() {
final Action0 action = new Action0() {

@Override
public void call() {
pollQueue();
}
@Override
public void call() {
pollQueue();
}

});
};

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
recursiveScheduler.schedule(action);
}
}

// only execute this from schedule()
private void pollQueue() {
void pollQueue() {
int emitted = 0;
do {
/*
* Set to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before exiting even once we've drained
*/
COUNTER_UPDATER.set(this, 1);
counter = 1;

// middle:
while (!scheduledUnsubscribe.isUnsubscribed()) {
if (failure) {
// special handling to short-circuit an error propagation
Object o = queue.poll();
// completed so we will skip onNext if they exist and only emit terminal events
if (on.isError(o)) {
// only emit error
on.accept(child, o);
// we have emitted a terminal event so return (exit the loop we're in)
child.onError(error);
return;
} else {
if (requested == 0 && completed && queue.isEmpty()) {
child.onCompleted();
return;
}
} else {
if (REQUESTED.getAndDecrement(this) != 0) {
Object o = queue.poll();
if (o == null) {
if (completed) {
if (failure) {
child.onError(error);
} else {
child.onCompleted();
}
return;
}
// nothing in queue
REQUESTED.incrementAndGet(this);
break;
Expand Down Expand Up @@ -213,12 +227,10 @@ static final class ScheduledUnsubscribe implements Subscription {
final Scheduler.Worker worker;
volatile int once;
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");
final RxRingBuffer queue;
volatile boolean unsubscribed = false;

public ScheduledUnsubscribe(Scheduler.Worker worker, RxRingBuffer queue) {
public ScheduledUnsubscribe(Scheduler.Worker worker) {
this.worker = worker;
this.queue = queue;
}

@Override
Expand Down
38 changes: 21 additions & 17 deletions src/main/java/rx/internal/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package rx.internal.schedulers;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import java.util.concurrent.*;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import rx.*;
import rx.functions.Action0;
import rx.internal.util.*;
import rx.subscriptions.*;

public class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
Expand Down Expand Up @@ -95,7 +92,9 @@ public Subscription scheduleDirect(Action0 action) {
}

private static class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final SubscriptionList serial = new SubscriptionList();
private final CompositeSubscription timed = new CompositeSubscription();
private final SubscriptionList both = new SubscriptionList(serial, timed);
private final PoolWorker poolWorker;

EventLoopWorker(PoolWorker poolWorker) {
Expand All @@ -105,28 +104,33 @@ private static class EventLoopWorker extends Scheduler.Worker {

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
both.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
return both.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction s = poolWorker.scheduleActual(action, 0, null);

serial.add(s);
s.addParent(serial);

return s;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed);

ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
Expand Down
35 changes: 33 additions & 2 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import rx.*;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.internal.util.RxThreadFactory;
import rx.internal.util.*;
import rx.plugins.*;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.*;

/**
* @warn class description missing
Expand Down Expand Up @@ -174,6 +174,37 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time

return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

@Override
public void unsubscribe() {
Expand Down
Loading

0 comments on commit ecbd27d

Please sign in to comment.