Skip to content

Commit

Permalink
Merge pull request ReactiveX#3138 from akarnokd/RangePerf
Browse files Browse the repository at this point in the history
Range overhead reduction.
  • Loading branch information
akarnokd committed Aug 12, 2015
2 parents 6362dfe + 9643d94 commit 054ba58
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 56 deletions.
112 changes: 68 additions & 44 deletions src/main/java/rx/internal/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.Producer;
import rx.Subscriber;

/**
* Emit ints from start to end inclusive.
Expand All @@ -39,13 +38,13 @@ public void call(final Subscriber<? super Integer> o) {
o.setProducer(new RangeProducer(o, start, end));
}

private static final class RangeProducer implements Producer {
private static final class RangeProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = 4114392207069098388L;

private final Subscriber<? super Integer> o;
// accessed by REQUESTED_UPDATER
private volatile long requested;
private static final AtomicLongFieldUpdater<RangeProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(RangeProducer.class, "requested");
private long index;
private final int end;
private long index;

private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
this.o = o;
Expand All @@ -55,54 +54,79 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {

@Override
public void request(long n) {
if (requested == Long.MAX_VALUE) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
// fast-path without backpressure
for (long i = index; i <= end; i++) {
fastpath();
} else if (n > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, n);
if (c == 0L) {
// backpressure is requested
slowpath(n);
}
}
}

/**
*
*/
void slowpath(long r) {
long idx = index;
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long fs = end - idx + 1;
long e = Math.min(fs, r);
final boolean complete = fs <= r;

fs = e + idx;
final Subscriber<? super Integer> o = this.o;

for (long i = idx; i != fs; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
if (!o.isUnsubscribed()) {

if (complete) {
if (o.isUnsubscribed()) {
return;
}
o.onCompleted();
return;
}
} else if (n > 0) {
// backpressure is requested
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER,this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long r = requested;
long idx = index;
long numLeft = end - idx + 1;
long e = Math.min(numLeft, r);
boolean completeOnFinish = numLeft <= r;
long stopAt = e + idx;
for (long i = idx; i < stopAt; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
index = stopAt;

if (completeOnFinish) {
o.onCompleted();
return;
}
if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
// we're done emitting the number requested so return
return;
}
}

idx = fs;
index = fs;

r = addAndGet(-e);
if (r == 0L) {
// we're done emitting the number requested so return
return;
}
}
}

/**
*
*/
void fastpath() {
final long end = this.end + 1L;
final Subscriber<? super Integer> o = this.o;
for (long i = index; i != end; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
}
}
}
Expand Down
17 changes: 5 additions & 12 deletions src/perf/java/rx/operators/OperatorRangePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import rx.Observable;
import rx.Subscriber;
import rx.*;
import rx.internal.operators.OnSubscribeRange;

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
Expand All @@ -50,7 +43,7 @@ public static class InputUsingRequest {

@Setup
public void setup(final Blackhole bh) {
observable = Observable.range(0, size);
observable = Observable.create(new OnSubscribeRange(0, size));
this.bh = bh;
}

Expand Down Expand Up @@ -98,7 +91,7 @@ public static class InputWithoutRequest {

@Setup
public void setup(final Blackhole bh) {
observable = Observable.range(0, size);
observable = Observable.create(new OnSubscribeRange(0, size));
this.bh = bh;

}
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeRangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,23 @@ public void onNext(Integer t) {
}});
assertTrue(completed.get());
}

@Test(timeout = 1000)
public void testNearMaxValueWithoutBackpressure() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts);

ts.assertCompleted();
ts.assertNoErrors();
ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE);
}
@Test(timeout = 1000)
public void testNearMaxValueWithBackpressure() {
TestSubscriber<Integer> ts = TestSubscriber.create(3);
Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts);

ts.assertCompleted();
ts.assertNoErrors();
ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE);
}
}

0 comments on commit 054ba58

Please sign in to comment.