Skip to content

Commit

Permalink
Added bounded buffering capability to SubscribeOn
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 12, 2014
1 parent dc4ee52 commit dade7e1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 60 deletions.
17 changes: 11 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7060,27 +7060,32 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
* on the specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
* @see #subscribeOn(rx.Scheduler, int)
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler, false));
}
/**
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
* and allows buffering the events emitted from the source in the time gap between the original and
* actual subscription.
* and allows buffering some events emitted from the source in the time gap between the original and
* actual subscription, and any excess events will block the source until the actual subscription happens.
* <p>
* This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives
* such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions
* are lost.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
*
* @param scheduler
* the {@link Scheduler} to perform subscription and unsubscription actions on
* @param dontLoseEvents indicate that the operator should buffer events emitted in the time gap
* between the original and actual subscription and replay it to Observers
* @param bufferSize the number of events to buffer before blocking the source while in the time gap,
* negative value indicates an unlimited buffer
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
* on the specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
*/
public final Observable<T> subscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler, dontLoseEvents));
public final Observable<T> subscribeOn(Scheduler scheduler, int bufferSize) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler, true, bufferSize));
}

/**
Expand Down
75 changes: 54 additions & 21 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
public class BufferUntilSubscriber<T> extends Subscriber<T> {
/** The actual subscriber. */
private final Subscriber<? super T> actual;
/** The mutual exclusion for the duration of the replay. */
private final Object gate = new Object();
/** Queued events. */
private final Queue<Object> queue = new LinkedList<Object>();
/** Indicate the pass-through mode. */
private volatile boolean passthroughMode;
/** Protect mode transition. */
private final Object gate = new Object();
/** The buffered items. */
private final Queue<Object> queue = new LinkedList<Object>();
/** The queue capacity. */
private final int capacity;
/** Null sentinel (in case queue type is changed). */
private static final Object NULL_SENTINEL = new Object();
/** Complete sentinel. */
Expand All @@ -51,21 +53,25 @@ public ErrorSentinel(Throwable t) {
}
/**
* Constructor that wraps the actual subscriber and shares its subscription.
* @param actual
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
* @param actual
*/
public BufferUntilSubscriber(Subscriber<? super T> actual) {
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
this.capacity = capacity;
}
/**
* Constructor that wraps the actual subscriber and uses the given composite
* subscription.
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
* @param actual
* @param cs
*/
public BufferUntilSubscriber(Subscriber<? super T> actual, CompositeSubscription cs) {
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual, CompositeSubscription cs) {
super(cs);
this.actual = actual;
this.capacity = capacity;
}

/**
Expand Down Expand Up @@ -96,26 +102,30 @@ public void enterPassthroughMode() {
throw new NullPointerException();
}
}
/* Test artificial back-pressure.
try {
TimeUnit.SECONDS.sleep(2);
} catch (Throwable t) {
}
*/
passthroughMode = true;
gate.notifyAll();
}
}
}
}

@Override
public void onNext(T t) {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
queue.offer(t != null ? t : NULL_SENTINEL);
return;
if (capacity < 0 || queue.size() < capacity) {
queue.offer(t != null ? t : NULL_SENTINEL);
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
Expand All @@ -127,8 +137,19 @@ public void onError(Throwable e) {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
queue.offer(new ErrorSentinel(e));
return;
if (capacity < 0 || queue.size() < capacity) {
queue.offer(new ErrorSentinel(e));
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
Expand All @@ -140,11 +161,23 @@ public void onCompleted() {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
queue.offer(COMPLETE_SENTINEL);
return;
if (capacity < 0 || queue.size() < capacity) {
queue.offer(COMPLETE_SENTINEL);
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
actual.onCompleted();
}

}
24 changes: 17 additions & 7 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,22 @@ public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
* the actual subscription time should not get lost.
*/
private final boolean dontLoseEvents;

/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
private final int bufferSize;
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
this(scheduler, dontLoseEvents, -1);
}
/**
* Construct a SubscribeOn operator.
* @param scheduler the target scheduler
* @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
* @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*/
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
this.scheduler = scheduler;
this.dontLoseEvents = dontLoseEvents;
this.bufferSize = bufferSize;
}

@Override
Expand All @@ -60,20 +72,19 @@ public void onError(Throwable e) {
subscriber.onError(e);
}
boolean checkNeedBuffer(Observable<?> o) {
return (o instanceof GroupedObservable<?, ?>)
return dontLoseEvents || ((o instanceof GroupedObservable<?, ?>)
|| (o instanceof PublishSubject<?>)
// || (o instanceof BehaviorSubject<?, ?>)
;
);
}
@Override
public void onNext(final Observable<T> o) {
if (dontLoseEvents || checkNeedBuffer(o)) {
if (checkNeedBuffer(o)) {
final CompositeSubscription cs = new CompositeSubscription();
subscriber.add(cs);
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(subscriber, new CompositeSubscription());
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber, new CompositeSubscription());
o.subscribe(bus);
scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
cs.add(Subscriptions.create(new Action0() {
Expand All @@ -89,7 +100,6 @@ public void call(final Inner inner) {
}));
bus.enterPassthroughMode();
}

});
return;
}
Expand Down
84 changes: 58 additions & 26 deletions rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Ignore;

import org.junit.Test;

Expand All @@ -39,6 +42,7 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -153,7 +157,7 @@ public void call(
assertEquals(1, observer.getOnCompletedEvents().size());
}

static class SlowScheduler extends Scheduler {
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
Expand All @@ -168,35 +172,14 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) {

@Override
public Subscription schedule(final Action1<Scheduler.Inner> action) {
final CompositeSubscription cs = new CompositeSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
cs.add(mas);
mas.set(actual.schedule(new Action1<Inner>() {

@Override
public void call(Inner t1) {
// cs.delete(mas);
cs.add(actual.schedule(action, delay, unit));
}

}));
return cs;
return actual.schedule(action, delay, unit);
}

@Override
public Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit delayUnit) {
final CompositeSubscription cs = new CompositeSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
cs.add(mas);
mas.set(actual.schedule(new Action1<Inner>() {
@Override
public void call(Inner t1) {
// cs.delete(mas);
long nanos = unit.toNanos(delay) + delayUnit.toNanos(delayTime);
cs.add(actual.schedule(action, nanos, TimeUnit.NANOSECONDS));
}
}));
return cs;
TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
return actual.schedule(action, t, common);
}
}

Expand Down Expand Up @@ -338,4 +321,53 @@ public void call(String s) {
System.out.println("Results: " + results);
assertEquals(6, results.size());
}
void testBoundedBufferingWithSize(int size) throws Exception {
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);

final List<Long> deltas = Collections.synchronizedList(new ArrayList<Long>());

Subscription s = timer.timestamp().subscribeOn(
new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size).map(new Func1<Timestamped<Long>, Long>() {
@Override
public Long call(Timestamped<Long> t1) {
long v = System.currentTimeMillis() - t1.getTimestampMillis();
return v;
}
}).doOnNext(new Action1<Long>() {
@Override
public void call(Long t1) {
deltas.add(t1);
}
}).subscribe();

Thread.sleep(2050);

s.unsubscribe();

if (deltas.size() < size + 1) {
fail("To few items in deltas: " + deltas);
}
for (int i = 0; i < size + 1; i++) {
if (deltas.get(i) < 500) {
fail(i + "th item arrived too early: " + deltas);
}
}
for (int i = size + 1; i < deltas.size(); i++) {
if (deltas.get(i) >= 500) {
fail(i + "th item arrived too late: " + deltas);
}
}
}
@Test(timeout = 5000)
public void testBoundedBufferingOfZero() throws Exception {
testBoundedBufferingWithSize(0);
}
@Test(timeout = 5000)
public void testBoundedBufferingOfOne() throws Exception {
testBoundedBufferingWithSize(1);
}
@Test(timeout = 5000)
public void testBoundedBufferingOfTwo() throws Exception {
testBoundedBufferingWithSize(2);
}
}

0 comments on commit dade7e1

Please sign in to comment.