Skip to content

Commit

Permalink
Merge pull request ReactiveX#3480 from akarnokd/SubscribingPerfUpdate
Browse files Browse the repository at this point in the history
1.x: update and bugfix to SubscribingPerf
  • Loading branch information
akarnokd committed Oct 29, 2015
2 parents 5576628 + 6c19a7b commit b905d30
Showing 1 changed file with 71 additions and 12 deletions.
83 changes: 71 additions & 12 deletions src/perf/java/rx/SubscribingPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import rx.functions.Func1;

/**
* Benchmark the cost of subscription and initial request management.
* <p>
Expand All @@ -38,64 +40,121 @@ public class SubscribingPerf {

@Benchmark
public void justDirect(Blackhole bh) {
just.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
DirectSubscriber<Integer> subscriber = new DirectSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}

@Benchmark
public void justStarted(Blackhole bh) {
just.subscribe(new StartedSubscriber<Integer>(Long.MAX_VALUE, bh));
StartedSubscriber<Integer> subscriber = new StartedSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}

@Benchmark
public void justUsual(Blackhole bh) {
just.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
UsualSubscriber<Integer> subscriber = new UsualSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}

@Benchmark
public void rangeDirect(Blackhole bh) {
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
DirectSubscriber<Integer> subscriber = new DirectSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}

@Benchmark
public void rangeStarted(Blackhole bh) {
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
StartedSubscriber<Integer> subscriber = new StartedSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}

@Benchmark
public void rangeUsual(Blackhole bh) {
range.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
UsualSubscriber<Integer> subscriber = new UsualSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}

@Benchmark
public void justDirectUnsafe(Blackhole bh) {
just.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
DirectSubscriber<Integer> subscriber = new DirectSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}

@Benchmark
public void justStartedUnsafe(Blackhole bh) {
just.unsafeSubscribe(new StartedSubscriber<Integer>(Long.MAX_VALUE, bh));
StartedSubscriber<Integer> subscriber = new StartedSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}

@Benchmark
public void justUsualUnsafe(Blackhole bh) {
just.unsafeSubscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
UsualSubscriber<Integer> subscriber = new UsualSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}

@Benchmark
public void rangeDirectUnsafe(Blackhole bh) {
range.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
DirectSubscriber<Integer> subscriber = new DirectSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}

@Benchmark
public void rangeStartedUnsafe(Blackhole bh) {
range.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
StartedSubscriber<Integer> subscriber = new StartedSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}

@Benchmark
public void rangeUsualUnsafe(Blackhole bh) {
range.unsafeSubscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
UsualSubscriber<Integer> subscriber = new UsualSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}

@State(Scope.Thread)
public static class Chain {
@Param({"10", "1000", "1000000"})
public int times;

@Param({"1", "2", "3", "4", "5"})
public int maps;

Observable<Integer> source;

@Setup
public void setup() {
Observable<Integer> o = Observable.range(1, times);

for (int i = 0; i < maps; i++) {
o = o.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v + 1;
}
});
}

source = o;
}

@Benchmark
public void mapped(Chain c, Blackhole bh) {
DirectSubscriber<Integer> subscriber = new DirectSubscriber<Integer>(Long.MAX_VALUE, bh);
bh.consume(subscriber);
c.source.subscribe(subscriber);
}
}

static final class DirectSubscriber<T> extends Subscriber<T> {
final long r;
Expand Down

0 comments on commit b905d30

Please sign in to comment.